You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2022/08/09 22:56:12 UTC
[ignite] branch master updated: IGNITE-17316 Add pluggable affinity mapper to the thin client partition awareness feature (#10140)
This is an automated email from the ASF dual-hosted git repository.
mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new daa397ccb09 IGNITE-17316 Add pluggable affinity mapper to the thin client partition awareness feature (#10140)
daa397ccb09 is described below
commit daa397ccb09a82fbf41367bc6725cf8fd0e841fd
Author: Maxim Muzafarov <ma...@gmail.com>
AuthorDate: Wed Aug 10 01:56:04 2022 +0300
IGNITE-17316 Add pluggable affinity mapper to the thin client partition awareness feature (#10140)
---
.../org/apache/ignite/snippets/JavaThinClient.java | 24 +++
docs/_docs/thin-clients/java-thin-client.adoc | 8 +
...ientPartitionAwarenessMapperAPITestWrapper.java | 91 +++++++++
.../clients/JavaThinCompatibilityTest.java | 28 ++-
.../client/ClientPartitionAwarenessMapper.java | 43 +++++
.../ClientPartitionAwarenessMapperFactory.java | 48 +++++
.../ignite/configuration/ClientConfiguration.java | 28 +++
.../client/thin/ClientCacheAffinityContext.java | 208 +++++++++++++++++----
.../client/thin/ClientCacheAffinityMapping.java | 92 +++++++--
.../client/thin/ProtocolBitmaskFeature.java | 5 +-
.../internal/client/thin/ReliableChannel.java | 29 ++-
.../internal/client/thin/TcpClientCache.java | 2 +
.../internal/client/thin/TcpIgniteClient.java | 13 +-
.../platform/client/ClientBitmaskFeature.java | 5 +-
.../platform/client/ClientMessageParser.java | 2 +-
.../cache/ClientCachePartitionAwarenessGroup.java | 87 +++++----
.../client/cache/ClientCachePartitionMapping.java | 27 ++-
.../client/cache/ClientCachePartitionsRequest.java | 164 ++++++++--------
.../cache/ClientCachePartitionsResponse.java | 8 +-
...lientPartitionAwarenessResourceReleaseTest.java | 80 ++++++++
...ClientPartitionAwarenessStableTopologyTest.java | 28 ++-
21 files changed, 817 insertions(+), 203 deletions(-)
diff --git a/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/JavaThinClient.java b/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/JavaThinClient.java
index d3ba1c510e7..c11241a4ae8 100644
--- a/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/JavaThinClient.java
+++ b/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/JavaThinClient.java
@@ -365,6 +365,30 @@ public class JavaThinClient {
//end::partition-awareness[]
}
+ void partitionAwarenessWithCustomMapper() throws Exception {
+ //tag::partition-awareness-with-mapper[]
+ // Partition awarenes is enabled by default since Apache Ignite 2.11 release.
+ ClientConfiguration cfg = new ClientConfiguration()
+ .setAddresses("node1_address:10800", "node2_address:10800", "node3_address:10800")
+ .setPartitionAwarenessMapperFactory(new ClientPartitionAwarenessMapperFactory() {
+ /** {@inheritDoc} */
+ @Override public ClientPartitionAwarenessMapper create(String cacheName, int partitions) {
+ AffinityFunction aff = new RendezvousAffinityFunction(false, partitions);
+
+ return aff::partition;
+ }
+ })
+
+ try (IgniteClient client = Ignition.startClient(cfg)) {
+ ClientCache<Integer, String> cache = client.cache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+ // Put, get or remove data from the cache, partition awarenes will be enabled.
+ }
+ catch (ClientException e) {
+ System.err.println(e.getMessage());
+ }
+ //end::partition-awareness-with-mapper[]
+ }
+
private String[] fetchServerAddresses() {
return null;
}
diff --git a/docs/_docs/thin-clients/java-thin-client.adoc b/docs/_docs/thin-clients/java-thin-client.adoc
index f430c301c34..1469e446645 100644
--- a/docs/_docs/thin-clients/java-thin-client.adoc
+++ b/docs/_docs/thin-clients/java-thin-client.adoc
@@ -86,6 +86,14 @@ The following code sample illustrates how to use the partition awareness feature
include::{sourceCodeFile}[tag=partition-awareness,indent=0]
----
+The code sample below shows how to use a custom cache key to partition mapping function to enable affinity awareness on
+a thin client side if the cache already exists in a cluster or/and was created with custom AffinityFunction or AffinityKeyMapper.
+
+[source, java]
+----
+include::{sourceCodeFile}[tag=partition-awareness-with-mapper,indent=0]
+----
+
If a list of server nodes is dynamically changing or scaling, then it is possible to configure the connection with custom implementation of `ClientAddressFinder`. It should provide a number of current server addresses every time a client asks for them.
The following code sample illustrates how to use it.
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/ClientPartitionAwarenessMapperAPITestWrapper.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/ClientPartitionAwarenessMapperAPITestWrapper.java
new file mode 100644
index 00000000000..f6f125af59f
--- /dev/null
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/ClientPartitionAwarenessMapperAPITestWrapper.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compatibility.clients;
+
+import java.io.Serializable;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
+import org.apache.ignite.client.ClientPartitionAwarenessMapper;
+import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgniteProductVersion;
+import static org.apache.ignite.compatibility.clients.JavaThinCompatibilityTest.ADDR;
+import static org.apache.ignite.compatibility.clients.JavaThinCompatibilityTest.CACHE_WITH_CUSTOM_AFFINITY;
+import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.ALL_AFFINITY_MAPPINGS;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This class is used to test a new API for partition awareness mapper factory added since 2.14 release.
+ *
+ * This wrapper is required to solve serialization/deserialization issues when the
+ * {@link JavaThinCompatibilityTest#testClient(IgniteProductVersion, IgniteProductVersion)} is used upon previous
+ * Ignite releases. The newly added classes must not be loaded by default when the test method is deserialized.
+ */
+public class ClientPartitionAwarenessMapperAPITestWrapper implements Serializable {
+ /** Serial version UID. */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public static void testCustomPartitionAwarenessMapper() {
+ X.println(">>>> Testing custom partition awareness mapper");
+
+ ClientConfiguration cfg = new ClientConfiguration()
+ .setAddresses(ADDR)
+ .setPartitionAwarenessMapperFactory(new ClientPartitionAwarenessMapperFactory() {
+ /** {@inheritDoc} */
+ @Override public ClientPartitionAwarenessMapper create(String cacheName, int partitions) {
+ AffinityFunction aff = new RendezvousAffinityFunction(false, partitions);
+
+ return aff::partition;
+ }
+ });
+
+ try (IgniteClient client = Ignition.startClient(cfg)) {
+ ClientCache<Integer, Integer> cache = client.cache(CACHE_WITH_CUSTOM_AFFINITY);
+
+ assertEquals(CACHE_WITH_CUSTOM_AFFINITY, cache.getName());
+ assertEquals(Integer.valueOf(0), cache.get(0));
+ }
+ }
+
+ /** */
+ public static void testCustomPartitionAwarenessMapperThrows() {
+ X.println(">>>> Testing custom partition awareness mapper throws");
+
+ ClientConfiguration cfg = new ClientConfiguration()
+ .setAddresses(ADDR)
+ .setPartitionAwarenessMapperFactory((cacheName, parts) -> null);
+
+ try (IgniteClient client = Ignition.startClient(cfg)) {
+ String errMsg = "Feature " + ALL_AFFINITY_MAPPINGS.name() + " is not supported by the server";
+
+ Throwable err = assertThrowsWithCause(
+ () -> client.cache(CACHE_WITH_CUSTOM_AFFINITY).get(0),
+ ClientFeatureNotSupportedByServerException.class
+ );
+
+ assertEquals(errMsg, err.getMessage());
+ }
+ }
+}
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
index 7b725088f2f..9e4d79d3ced 100644
--- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
@@ -28,12 +28,14 @@ import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.client.ClientCache;
@@ -47,6 +49,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -69,7 +72,6 @@ import org.jetbrains.annotations.Nullable;
import org.junit.Assume;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-
import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.GET_SERVICE_DESCRIPTORS;
import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.SERVICE_INVOKE_CALLCTX;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
@@ -81,7 +83,10 @@ import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCaus
@RunWith(Parameterized.class)
public class JavaThinCompatibilityTest extends AbstractClientCompatibilityTest {
/** Thin client endpoint. */
- private static final String ADDR = "127.0.0.1:10800";
+ public static final String ADDR = "127.0.0.1:10800";
+
+ /** Cache name. */
+ public static final String CACHE_WITH_CUSTOM_AFFINITY = "cache_with_custom_affinity";
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -99,6 +104,11 @@ public class JavaThinCompatibilityTest extends AbstractClientCompatibilityTest {
if (ver.compareTo(VER_2_13_0) >= 0)
ignite.services().deployNodeSingleton("ctx_service", new CtxService());
+ IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(new CacheConfiguration<Integer, Integer>(CACHE_WITH_CUSTOM_AFFINITY)
+ .setAffinity(new CustomAffinity()));
+
+ cache.put(0, 0);
+
super.initNode(ignite);
}
@@ -435,6 +445,16 @@ public class JavaThinCompatibilityTest extends AbstractClientCompatibilityTest {
if (clientVer.compareTo(VER_2_14_0) >= 0)
testDataReplicationOperations(serverVer.compareTo(VER_2_14_0) >= 0);
+
+ if (clientVer.compareTo(VER_2_14_0) >= 0) {
+ // This wrapper is used to avoid serialization/deserialization issues when the `testClient` is
+ // tried to be deserialized on previous Ignite releases that do not contain a newly added classes.
+ // Such classes will be loaded by classloader only if a version of the thin client is match.
+ if (serverVer.compareTo(VER_2_14_0) >= 0)
+ ClientPartitionAwarenessMapperAPITestWrapper.testCustomPartitionAwarenessMapper();
+ else if (serverVer.compareTo(VER_2_11_0) >= 0) // Partition awareness available from.
+ ClientPartitionAwarenessMapperAPITestWrapper.testCustomPartitionAwarenessMapperThrows();
+ }
}
/** */
@@ -593,4 +613,8 @@ public class JavaThinCompatibilityTest extends AbstractClientCompatibilityTest {
return results.get(0).getData();
}
}
+
+ /** */
+ public static class CustomAffinity extends RendezvousAffinityFunction {
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientPartitionAwarenessMapper.java b/modules/core/src/main/java/org/apache/ignite/client/ClientPartitionAwarenessMapper.java
new file mode 100644
index 00000000000..64ea78b9282
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientPartitionAwarenessMapper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityKeyMapper;
+import org.apache.ignite.configuration.ClientConfiguration;
+
+/**
+ * This function calculates the cache key to a partition mapping for each cache key. It is used only for local calculation on a client side.
+ * <p>
+ * When the {@link ClientConfiguration#isPartitionAwarenessEnabled()} and the cache was created with a custom {@link AffinityFunction}
+ * or a {@link AffinityKeyMapper} this function will be used to calculate mappings. Be sure that a key maps to the same partition
+ * produced by the {@link AffinityFunction#partition(Object)} method.
+ *
+ * @see AffinityFunction#partition(Object)
+ * @since 2.14
+ */
+public interface ClientPartitionAwarenessMapper {
+ /**
+ * Gets a partition number for a given key starting from {@code 0}. Be sure that a key maps to the same partition
+ * produced by the {@link AffinityFunction#partition(Object)} method.
+ *
+ * @param key Key to get partition for.
+ * @return Partition number for a given key.
+ */
+ public int partition(Object key);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientPartitionAwarenessMapperFactory.java b/modules/core/src/main/java/org/apache/ignite/client/ClientPartitionAwarenessMapperFactory.java
new file mode 100644
index 00000000000..c26421dda84
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/ClientPartitionAwarenessMapperFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityKeyMapper;
+
+/**
+ * This factory is used on the client side and only when the partition awareness thin client feature is enabled. By default,
+ * on a new cache the RendezvousAffinityFunction will be used for calculating mappings 'key-to-partition' and 'partition-to-node'.
+ * The thin client will update all 'partitions-to-node' mappings on every cluster topology change and the 'key-to-partition'
+ * mapping will be calculated on the client side.
+ * <p>
+ * The case described above will not be possible (and in turn partition awareness won't work) when a custom {@link AffinityFunction} or
+ * a {@link AffinityKeyMapper} was previously used for a cache creation. The partition awareness mapper factory is used to solve this
+ * issue. All 'partition-to-node' mappings will still be requested and received from a server node, however, if a custom AffinityFunction
+ * or a custom AffinityKeyMapper was used a ClientPartitionAwarenessMapper produced by this factory will calculate mapping a key to
+ * a partition.
+ * <p>
+ * These key to partition mapping functions produced by the factory are used only for local calculations, they will not be passed
+ * to a server node.
+ *
+ * @see AffinityFunction
+ * @since 2.14
+ */
+public interface ClientPartitionAwarenessMapperFactory {
+ /**
+ * @param cacheName Cache name to create a mapper for.
+ * @param partitions Number of cache partitions received from a server node.
+ * @return Key to a partition mapper function.
+ */
+ public ClientPartitionAwarenessMapper create(String cacheName, int partitions);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
index 7a0ffbefd36..f3eb294965c 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
@@ -25,6 +25,8 @@ import java.util.concurrent.ForkJoinPool;
import javax.cache.configuration.Factory;
import javax.net.ssl.SSLContext;
import org.apache.ignite.client.ClientAddressFinder;
+import org.apache.ignite.client.ClientPartitionAwarenessMapper;
+import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
import org.apache.ignite.client.ClientRetryAllPolicy;
import org.apache.ignite.client.ClientRetryPolicy;
import org.apache.ignite.client.SslMode;
@@ -111,6 +113,13 @@ public final class ClientConfiguration implements Serializable {
*/
private boolean partitionAwarenessEnabled = true;
+ /**
+ * This factory accepts as parameters a cache name and the number of cache partitions received from a server node and produces
+ * a {@link ClientPartitionAwarenessMapper}. This mapper function is used only for local calculations key to a partition and
+ * will not be passed to a server node.
+ */
+ private ClientPartitionAwarenessMapperFactory partitionAwarenessMapperFactory;
+
/**
* Reconnect throttling period (in milliseconds). There are no more than {@code reconnectThrottlingRetries}
* attempts to reconnect will be made within {@code reconnectThrottlingPeriod} in case of connection loss.
@@ -786,4 +795,23 @@ public final class ClientConfiguration implements Serializable {
return this;
}
+
+ /**
+ * @param factory Factory that accepts as parameters a cache name and the number of cache partitions received from a server node
+ * and produces key to partition mapping functions.
+ * @return {@code this} for chaining.
+ */
+ public ClientConfiguration setPartitionAwarenessMapperFactory(ClientPartitionAwarenessMapperFactory factory) {
+ partitionAwarenessMapperFactory = factory;
+
+ return this;
+ }
+
+ /**
+ * @return Factory that accepts as parameters a cache name and the number of cache partitions received from a server node
+ * and produces key to partition mapping functions.
+ */
+ public ClientPartitionAwarenessMapperFactory getPartitionAwarenessMapperFactory() {
+ return partitionAwarenessMapperFactory;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
index 90a26300da8..3c2e9686372 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
@@ -19,34 +19,61 @@ package org.apache.ignite.internal.client.thin;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.client.ClientPartitionAwarenessMapper;
+import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
/**
* Client cache partition awareness context.
*/
public class ClientCacheAffinityContext {
+ /** If a factory needs to be removed. */
+ private static final long REMOVED_TS = 0;
+
+ /**
+ * Factory for each cache id to produce key to partition mapping functions.
+ * This factory is also used to resolve cacheName from cacheId. If a cache has default affinity mappings then
+ * it will be cleared on the next affinity mapping request.
+ */
+ final Map<Integer, ClientPartitionAwarenessMapperHolder> cacheKeyMapperFactoryMap = new HashMap<>();
+
/** Binary data processor. */
private final IgniteBinary binary;
+ /** Mapper factory from client configuration that is used for cache groups with custom affinity. */
+ private final ClientPartitionAwarenessMapperFactory paMapFactory;
+
/** Contains last topology version and known nodes of this version. */
private final AtomicReference<TopologyNodes> lastTop = new AtomicReference<>();
+ /** Cache IDs, which should be included to the next affinity mapping request. */
+ private final Set<Integer> pendingCacheIds = new GridConcurrentHashSet<>();
+
/** Current affinity mapping. */
private volatile ClientCacheAffinityMapping affinityMapping;
- /** Cache IDs, which should be included to the next affinity mapping request. */
- private final Set<Integer> pendingCacheIds = new GridConcurrentHashSet<>();
+ /** Caches that have been requested partition mappings for. */
+ private volatile CacheMappingRequest rq;
/**
* @param binary Binary data processor.
+ * @param factory Factory for caches with custom affinity.
*/
- public ClientCacheAffinityContext(IgniteBinary binary) {
+ public ClientCacheAffinityContext(IgniteBinary binary, @Nullable ClientPartitionAwarenessMapperFactory factory) {
+ this.paMapFactory = factory;
this.binary = binary;
}
@@ -81,42 +108,39 @@ public class ClientCacheAffinityContext {
* @param cacheId Cache id.
*/
public boolean affinityUpdateRequired(int cacheId) {
- TopologyNodes top = lastTop.get();
-
- if (top == null) { // Don't know current topology.
- pendingCacheIds.add(cacheId);
-
- return false;
- }
-
- ClientCacheAffinityMapping mapping = affinityMapping;
-
- if (mapping == null) {
- pendingCacheIds.add(cacheId);
-
- return true;
- }
+ ClientCacheAffinityMapping mapping = currentMapping();
- if (top.topVer.compareTo(mapping.topologyVersion()) > 0) {
+ if (mapping == null || !mapping.cacheIds().contains(cacheId)) {
pendingCacheIds.add(cacheId);
return true;
}
- if (mapping.cacheIds().contains(cacheId))
- return false;
- else {
- pendingCacheIds.add(cacheId);
-
- return true;
- }
+ return false;
}
/**
* @param ch Payload output channel.
*/
public void writePartitionsUpdateRequest(PayloadOutputChannel ch) {
- ClientCacheAffinityMapping.writeRequest(ch, pendingCacheIds);
+ assert rq == null : "Previous mapping request was not properly handled: " + rq;
+
+ final Set<Integer> cacheIds;
+ long lastAccessed;
+
+ synchronized (cacheKeyMapperFactoryMap) {
+ cacheIds = new HashSet<>(pendingCacheIds);
+
+ lastAccessed = cacheIds.stream()
+ .map(cacheKeyMapperFactoryMap::get)
+ .filter(Objects::nonNull)
+ .mapToLong(h -> h.ts)
+ .reduce(Math::max)
+ .orElse(0);
+ }
+
+ rq = new CacheMappingRequest(cacheIds, lastAccessed);
+ ClientCacheAffinityMapping.writeRequest(ch, rq.caches, rq.ts > 0);
}
/**
@@ -126,30 +150,64 @@ public class ClientCacheAffinityContext {
if (lastTop.get() == null)
return false;
- ClientCacheAffinityMapping newMapping = ClientCacheAffinityMapping.readResponse(ch);
+ CacheMappingRequest rq0 = rq;
+
+ ClientCacheAffinityMapping newMapping = ClientCacheAffinityMapping.readResponse(ch,
+ new Function<Integer, Function<Integer, ClientPartitionAwarenessMapper>>() {
+ @Override public Function<Integer, ClientPartitionAwarenessMapper> apply(Integer cacheId) {
+ synchronized (cacheKeyMapperFactoryMap) {
+ ClientPartitionAwarenessMapperHolder hld = cacheKeyMapperFactoryMap.get(cacheId);
+
+ // Factory concurrently removed on cache destroy.
+ if (paMapFactory == null || hld == null || hld.ts == REMOVED_TS)
+ return null;
+
+ if (hld.factory == null)
+ hld.factory = (parts) -> paMapFactory.create(hld.cacheName, parts);
+
+ return hld.factory;
+ }
+ }
+ }
+ );
+
+ synchronized (cacheKeyMapperFactoryMap) {
+ cacheKeyMapperFactoryMap.entrySet()
+ .removeIf(e -> {
+ // Process only requested caches.
+ if (!rq0.caches.contains(e.getKey()))
+ return false;
+
+ if (newMapping.cacheIds().contains(e.getKey())) {
+ // Remove caches that have default affinity.
+ return e.getValue().factory == null;
+ }
+ else {
+ // Requested, but not received caches means that they have been destoryed on the server side.
+ return e.getValue().ts <= rq0.ts;
+ }
+ });
+ }
+
+ rq = null;
ClientCacheAffinityMapping oldMapping = affinityMapping;
if (oldMapping == null || newMapping.topologyVersion().compareTo(oldMapping.topologyVersion()) > 0) {
affinityMapping = newMapping;
+ // Re-request mappings that are out of date.
if (oldMapping != null)
pendingCacheIds.addAll(oldMapping.cacheIds());
- pendingCacheIds.removeAll(newMapping.cacheIds());
-
- return true;
}
-
- if (newMapping.topologyVersion().equals(oldMapping.topologyVersion())) {
+ else if (newMapping.topologyVersion().equals(oldMapping.topologyVersion()))
affinityMapping = ClientCacheAffinityMapping.merge(oldMapping, newMapping);
- pendingCacheIds.removeAll(newMapping.cacheIds());
+ pendingCacheIds.removeAll(newMapping.cacheIds());
+ // Some caches can be requested, but not received from server (destoroyed on the server side).
+ pendingCacheIds.removeAll(rq0.caches);
- return true;
- }
-
- // Obsolete mapping.
return true;
}
@@ -202,7 +260,7 @@ public class ClientCacheAffinityContext {
/**
* Current affinity mapping.
*/
- private ClientCacheAffinityMapping currentMapping() {
+ protected ClientCacheAffinityMapping currentMapping() {
TopologyNodes top = lastTop.get();
if (top == null)
@@ -219,6 +277,33 @@ public class ClientCacheAffinityContext {
return mapping;
}
+ /**
+ * @param cacheName Cache name.
+ */
+ public void registerCache(String cacheName) {
+ synchronized (cacheKeyMapperFactoryMap) {
+ ClientPartitionAwarenessMapperHolder hld = cacheKeyMapperFactoryMap.computeIfAbsent(ClientUtils.cacheId(cacheName),
+ id -> new ClientPartitionAwarenessMapperHolder(cacheName));
+
+ hld.ts = U.currentTimeMillis();
+ }
+ }
+
+ /**
+ * @param cacheName Cache name.
+ */
+ public void unregisterCache(String cacheName) {
+ synchronized (cacheKeyMapperFactoryMap) {
+ ClientPartitionAwarenessMapperHolder hld = cacheKeyMapperFactoryMap.get(ClientUtils.cacheId(cacheName));
+
+ if (hld == null)
+ return;
+
+ // Schedule cache factory remove.
+ hld.ts = REMOVED_TS;
+ }
+ }
+
/**
* Holder for list of nodes for topology version.
*/
@@ -246,4 +331,49 @@ public class ClientCacheAffinityContext {
return Collections.unmodifiableCollection(nodes);
}
}
+
+ /** Holder of a mapper factory and cacheName. */
+ private static class ClientPartitionAwarenessMapperHolder {
+ /** Cache name. */
+ private final String cacheName;
+
+ /** Factory. */
+ private @Nullable Function<Integer, ClientPartitionAwarenessMapper> factory;
+
+ /** Last accessed timestamp. */
+ private long ts;
+
+ /**
+ * @param cacheName Cache name.
+ */
+ public ClientPartitionAwarenessMapperHolder(String cacheName) {
+ this.cacheName = cacheName;
+ }
+ }
+
+ /** Request of cache mappings. */
+ private static class CacheMappingRequest {
+ /** Cache ids which have been requested. */
+ private final Set<Integer> caches;
+
+ /** Request timestamp. */
+ private final long ts;
+
+ /**
+ * @param caches Cache ids which have been requested.
+ * @param ts Request timestamp.
+ */
+ public CacheMappingRequest(Set<Integer> caches, long ts) {
+ this.caches = caches;
+ this.ts = ts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "CacheMappingRequest{" +
+ "caches=" + caches +
+ ", ts=" + ts +
+ '}';
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
index c3d4bd4f67c..b0e47e0b594 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityMapping.java
@@ -24,14 +24,18 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Function;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
+import org.apache.ignite.client.ClientPartitionAwarenessMapper;
import org.apache.ignite.internal.binary.BinaryObjectExImpl;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.ALL_AFFINITY_MAPPINGS;
/**
* Affinity mapping (partition to nodes) for each cache.
@@ -39,7 +43,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
public class ClientCacheAffinityMapping {
/** CacheAffinityInfo for caches with not applicable partition awareness. */
private static final CacheAffinityInfo NOT_APPLICABLE_CACHE_AFFINITY_INFO =
- new CacheAffinityInfo(null, null);
+ new CacheAffinityInfo(null, null, null);
/** Topology version. */
private final AffinityTopologyVersion topVer;
@@ -131,8 +135,7 @@ public class ClientCacheAffinityMapping {
assert res.topVer.equals(mapping.topVer) : "Mappings must have identical topology versions [res.topVer=" +
res.topVer + ", mapping.topVer=" + mapping.topVer + ']';
- for (Map.Entry<Integer, CacheAffinityInfo> entry : mapping.cacheAffinity.entrySet())
- res.cacheAffinity.put(entry.getKey(), entry.getValue());
+ res.cacheAffinity.putAll(mapping.cacheAffinity);
}
return res;
@@ -142,11 +145,20 @@ public class ClientCacheAffinityMapping {
* Writes caches affinity request to the output channel.
*
* @param ch Output channel.
- * @param cacheIds Cache IDs.
+ * @param cacheIds Set of cache ids to request.
+ * @param customMappingsRequired {@code true} if non-default affinity mappings required.
*/
- public static void writeRequest(PayloadOutputChannel ch, Collection<Integer> cacheIds) {
+ public static void writeRequest(PayloadOutputChannel ch, Collection<Integer> cacheIds, boolean customMappingsRequired) {
+ ProtocolContext ctx = ch.clientChannel().protocolCtx();
+
+ if (customMappingsRequired && !ctx.isFeatureSupported(ALL_AFFINITY_MAPPINGS))
+ throw new ClientFeatureNotSupportedByServerException(ALL_AFFINITY_MAPPINGS);
+
BinaryOutputStream out = ch.out();
+ if (ctx.isFeatureSupported(ALL_AFFINITY_MAPPINGS))
+ out.writeBoolean(customMappingsRequired);
+
out.writeInt(cacheIds.size());
for (int cacheId : cacheIds)
@@ -158,8 +170,12 @@ public class ClientCacheAffinityMapping {
* from this response.
*
* @param ch Input channel.
+ * @param mappers Function that produces key mapping functions.
*/
- public static ClientCacheAffinityMapping readResponse(PayloadInputChannel ch) {
+ public static ClientCacheAffinityMapping readResponse(
+ PayloadInputChannel ch,
+ Function<Integer, Function<Integer, ClientPartitionAwarenessMapper>> mappers
+ ) {
try (BinaryReaderExImpl in = ClientUtils.createBinaryReader(null, ch.in())) {
long topVer = in.readLong();
int minorTopVer = in.readInt();
@@ -174,7 +190,7 @@ public class ClientCacheAffinityMapping {
int cachesCnt = in.readInt();
- if (applicable) { // Partition awareness is applicable for this caches.
+ if (applicable) { // Partition awareness is applicable for these caches.
Map<Integer, Map<Integer, Integer>> cacheKeyCfg = U.newHashMap(cachesCnt);
for (int j = 0; j < cachesCnt; j++)
@@ -182,10 +198,24 @@ public class ClientCacheAffinityMapping {
UUID[] partToNode = readNodePartitions(in);
- for (Map.Entry<Integer, Map<Integer, Integer>> keyCfg : cacheKeyCfg.entrySet())
- aff.cacheAffinity.put(keyCfg.getKey(), new CacheAffinityInfo(keyCfg.getValue(), partToNode));
+ boolean dfltMapping = true;
+
+ if (ch.clientChannel().protocolCtx().isFeatureSupported(ALL_AFFINITY_MAPPINGS))
+ dfltMapping = in.readBoolean();
+
+ for (Map.Entry<Integer, Map<Integer, Integer>> keyCfg : cacheKeyCfg.entrySet()) {
+ Function<Integer, ClientPartitionAwarenessMapper> factory = dfltMapping ?
+ RendezvousAffinityKeyMapper::new : mappers.apply(keyCfg.getKey());
+
+ // Cache was concurrently destroyed.
+ if (factory == null)
+ continue;
+
+ aff.cacheAffinity.put(keyCfg.getKey(),
+ new CacheAffinityInfo(keyCfg.getValue(), partToNode, factory.apply(partToNode.length)));
+ }
}
- else { // Partition awareness is not applicable for this caches.
+ else { // Partition awareness is not applicable for these caches.
for (int j = 0; j < cachesCnt; j++)
aff.cacheAffinity.put(in.readInt(), NOT_APPLICABLE_CACHE_AFFINITY_INFO);
}
@@ -255,18 +285,18 @@ public class ClientCacheAffinityMapping {
/** Partition mapping. */
private final UUID[] partMapping;
- /** Affinity mask. */
- private final int affinityMask;
+ /** Mapper a cache key to a partition. */
+ private final ClientPartitionAwarenessMapper keyMapper;
/**
* @param keyCfg Cache key configuration or {@code null} if partition awareness is not applicable for this cache.
* @param partMapping Partition to node mapping or {@code null} if partition awareness is not applicable for
* this cache.
*/
- private CacheAffinityInfo(Map<Integer, Integer> keyCfg, UUID[] partMapping) {
+ private CacheAffinityInfo(Map<Integer, Integer> keyCfg, UUID[] partMapping, ClientPartitionAwarenessMapper keyMapper) {
this.keyCfg = keyCfg;
this.partMapping = partMapping;
- affinityMask = partMapping != null ? RendezvousAffinityFunction.calculateMask(partMapping.length) : 0;
+ this.keyMapper = keyMapper;
}
/**
@@ -275,11 +305,10 @@ public class ClientCacheAffinityMapping {
* @param key Key.
*/
private UUID nodeForKey(Object key) {
- assert partMapping != null;
+ if (keyMapper == null)
+ return null;
- int part = RendezvousAffinityFunction.calculatePartition(key, affinityMask, partMapping.length);
-
- return partMapping[part];
+ return nodeForPartition(keyMapper.partition(key));
}
/**
@@ -288,9 +317,32 @@ public class ClientCacheAffinityMapping {
* @param part Partition.
*/
private UUID nodeForPartition(int part) {
- assert partMapping != null;
+ if (part < 0 || partMapping == null || part >= partMapping.length)
+ return null;
+
+ return partMapping[part];
+ }
+ }
+
+ /** Default implementation of cache key to partition mapper. */
+ private static class RendezvousAffinityKeyMapper implements ClientPartitionAwarenessMapper {
+ /** Number of partitions. */
+ private final int parts;
+
+ /** Affinity mask. */
+ private final int affinityMask;
+
+ /**
+ * @param parts Number of partitions.
+ */
+ private RendezvousAffinityKeyMapper(int parts) {
+ this.parts = parts;
+ affinityMask = RendezvousAffinityFunction.calculateMask(parts);
+ }
- return part >= 0 && part < partMapping.length ? partMapping[part] : null;
+ /** {@inheritDoc} */
+ @Override public int partition(Object key) {
+ return RendezvousAffinityFunction.calculatePartition(key, affinityMask, parts);
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
index 0f437251562..c538c7e6345 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
@@ -62,7 +62,10 @@ public enum ProtocolBitmaskFeature {
HEARTBEAT(11),
/** Data replication operations: {@link TcpClientCache#putAllConflict}, {@link TcpClientCache#removeAllConflict}. */
- DATA_REPLICATION_OPERATIONS(12);
+ DATA_REPLICATION_OPERATIONS(12),
+
+ /** Send all mappings to the client including non-default affinity functions. */
+ ALL_AFFINITY_MAPPINGS(13);
/** */
private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index 50a1dc5213e..b92e8770669 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -45,6 +45,7 @@ import org.apache.ignite.client.ClientAuthorizationException;
import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientOperationType;
+import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
import org.apache.ignite.client.ClientRetryPolicy;
import org.apache.ignite.client.ClientRetryPolicyContext;
import org.apache.ignite.client.IgniteClientFuture;
@@ -130,7 +131,7 @@ final class ReliableChannel implements AutoCloseable {
partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled();
- affinityCtx = new ClientCacheAffinityContext(binary);
+ affinityCtx = new ClientCacheAffinityContext(binary, clientCfg.getPartitionAwarenessMapperFactory());
connMgr = new GridNioClientConnectionMultiplexer(clientCfg);
connMgr.start();
@@ -402,6 +403,25 @@ final class ReliableChannel implements AutoCloseable {
return serviceAsync(op, payloadWriter, payloadReader);
}
+ /**
+ * @param cacheName Cache name.
+ */
+ public void registerCacheIfCustomAffinity(String cacheName) {
+ ClientPartitionAwarenessMapperFactory factory = clientCfg.getPartitionAwarenessMapperFactory();
+
+ if (factory == null)
+ return;
+
+ affinityCtx.registerCache(cacheName);
+ }
+
+ /**
+ * @param cacheName Cache name.
+ */
+ public void unregisterCacheIfCustomAffinity(String cacheName) {
+ affinityCtx.unregisterCache(cacheName);
+ }
+
/**
* Checks if affinity information for the cache is up to date and tries to update it if not.
*
@@ -857,6 +877,13 @@ final class ReliableChannel implements AutoCloseable {
return plc.shouldRetry(ctx);
}
+ /**
+ * @return Affinity context.
+ */
+ ClientCacheAffinityContext affinityContext() {
+ return affinityCtx;
+ }
+
/**
* Channels holder.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index 4efa4e5183b..9b86e1c4efb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -125,6 +125,8 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
this.expiryPlc = expiryPlc;
jCacheAdapter = new ClientJCacheAdapter<>(this);
+
+ this.ch.registerCacheIfCustomAffinity(this.name);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index f96e2c822dc..f1d0bd5b90d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -235,13 +235,17 @@ public class TcpIgniteClient implements IgniteClient {
ensureCacheName(name);
ch.request(ClientOperation.CACHE_DESTROY, req -> req.out().writeInt(ClientUtils.cacheId(name)));
+ ch.unregisterCacheIfCustomAffinity(name);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Void> destroyCacheAsync(String name) throws ClientException {
ensureCacheName(name);
- return ch.requestAsync(ClientOperation.CACHE_DESTROY, req -> req.out().writeInt(ClientUtils.cacheId(name)));
+ return ch.requestAsync(ClientOperation.CACHE_DESTROY, req -> {
+ req.out().writeInt(ClientUtils.cacheId(name));
+ ch.unregisterCacheIfCustomAffinity(name);
+ });
}
/** {@inheritDoc} */
@@ -414,6 +418,13 @@ public class TcpIgniteClient implements IgniteClient {
return new TcpIgniteClient(cfg);
}
+ /**
+ * @return Channel.
+ */
+ ReliableChannel reliableChannel() {
+ return ch;
+ }
+
/** @throws IllegalArgumentException if the specified cache name is invalid. */
private static void ensureCacheName(String name) {
if (name == null || name.isEmpty())
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
index 88a25fe7df3..d4beaef89db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
@@ -63,7 +63,10 @@ public enum ClientBitmaskFeature implements ThinProtocolFeature {
HEARTBEAT(11),
/** Data replication operations: {@link TcpClientCache#putAllConflict}, {@link TcpClientCache#removeAllConflict}. */
- DATA_REPLICATION_OPERATIONS(12);
+ DATA_REPLICATION_OPERATIONS(12),
+
+ /** Send all mappings to the client including non-default affinity functions. */
+ ALL_AFFINITY_MAPPINGS(13);
/** */
private static final EnumSet<ClientBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index 0156e855bb4..04843bca0f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -549,7 +549,7 @@ public class ClientMessageParser implements ClientListenerMessageParser {
return new ClientCacheNodePartitionsRequest(reader);
case OP_CACHE_PARTITIONS:
- return new ClientCachePartitionsRequest(reader);
+ return new ClientCachePartitionsRequest(reader, protocolCtx);
case OP_CACHE_GET_NAMES:
return new ClientCacheGetNamesRequest(reader);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
index 273fbe7a60b..2cd20c97ea1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionAwarenessGroup.java
@@ -18,73 +18,60 @@
package org.apache.ignite.internal.processors.platform.client.cache;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import org.apache.ignite.binary.BinaryRawWriter;
import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.platform.client.ClientBitmaskFeature;
+import org.apache.ignite.internal.processors.platform.client.ClientProtocolContext;
+import org.jetbrains.annotations.Nullable;
/**
- * Partition mapping associated with the group of caches.
+ * Partitions mapping associated with a group of caches. This group may contain caches from different cache groups,
+ * the grouping criteria is - the same mapping and default or non-default affinity function flag.
*/
class ClientCachePartitionAwarenessGroup {
- /** Binary processor. */
- CacheObjectBinaryProcessorImpl proc;
+ /** Partition mapping. If {@code null} then cache must be excluded in partition awareness usage (e.g. REPLICATED cache). */
+ private final @Nullable ClientCachePartitionMapping mapping;
- /** Partition mapping. */
- private final ClientCachePartitionMapping mapping;
+ /** {@code true} if the RendezvousAffinityFunction is used with the default affinity key mapper. */
+ private final boolean dfltAffinity;
/** Descriptor of the associated caches. */
- private HashMap<Integer, CacheConfiguration> cacheCfgs;
+ private final Map<Integer, CacheConfiguration<?, ?>> cacheCfgs = new HashMap<>();
/**
- * @param proc Binary processor.
* @param mapping Partition mapping.
- * @param cacheDesc Descriptor of the initial cache.
+ * @param dfltAffinity {@code true} if the default affinity or a custom affinity mapper was used.
*/
- public ClientCachePartitionAwarenessGroup(CacheObjectBinaryProcessorImpl proc, ClientCachePartitionMapping mapping,
- DynamicCacheDescriptor cacheDesc) {
- this.proc = proc;
+ public ClientCachePartitionAwarenessGroup(@Nullable ClientCachePartitionMapping mapping, boolean dfltAffinity) {
this.mapping = mapping;
-
- int cacheId = cacheDesc.cacheId();
- CacheConfiguration ccfg = cacheDesc.cacheConfiguration();
-
- cacheCfgs = new HashMap<>();
- cacheCfgs.put(cacheId, ccfg);
- }
-
- /**
- * Check if the mapping is compatible to a mapping of the group.
- * @param mapping Affinity mapping.
- * @return True if compatible.
- */
- public boolean isCompatible(ClientCachePartitionMapping mapping) {
- // All unapplicable caches go to the same single group, so they are all compatible one to another.
- if (this.mapping == null || mapping == null)
- return this.mapping == mapping;
-
- // Now we need to compare mappings themselves.
- return mapping.isCompatible(mapping);
+ this.dfltAffinity = dfltAffinity;
}
/**
* Write mapping using binary writer.
- * @param writer Writer.
+ *
+ * @param proc Binary processor.
+ * @param writer Binary Writer.
+ * @param cpctx Protocol context.
*/
- public void write(BinaryRawWriter writer) {
+ public void write(CacheObjectBinaryProcessorImpl proc, BinaryRawWriter writer, ClientProtocolContext cpctx) {
writer.writeBoolean(mapping != null);
writer.writeInt(cacheCfgs.size());
- for (Map.Entry<Integer, CacheConfiguration> entry: cacheCfgs.entrySet()) {
+ for (Map.Entry<Integer, CacheConfiguration<?, ?>> entry: cacheCfgs.entrySet()) {
writer.writeInt(entry.getKey());
if (mapping == null)
continue;
- CacheConfiguration ccfg = entry.getValue();
+ CacheConfiguration<?, ?> ccfg = entry.getValue();
CacheKeyConfiguration[] keyCfgs = ccfg.getKeyConfiguration();
if (keyCfgs == null) {
@@ -106,13 +93,35 @@ class ClientCachePartitionAwarenessGroup {
if (mapping != null)
mapping.write(writer);
+
+ if (cpctx.isFeatureSupported(ClientBitmaskFeature.ALL_AFFINITY_MAPPINGS))
+ writer.writeBoolean(dfltAffinity);
}
/**
- * Add cache to affinity group.
- * @param desc Cache descriptor.
+ * Add caches to the same affinity group.
+ * @param descs Cache descriptors.
*/
- public void addCache(DynamicCacheDescriptor desc) {
- cacheCfgs.put(desc.cacheId(), desc.cacheConfiguration());
+ public void addAll(List<DynamicCacheDescriptor> descs) {
+ for (DynamicCacheDescriptor desc : descs)
+ cacheCfgs.putIfAbsent(desc.cacheId(), desc.cacheConfiguration());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ClientCachePartitionAwarenessGroup group = (ClientCachePartitionAwarenessGroup)o;
+
+ return dfltAffinity == group.dfltAffinity && Objects.equals(mapping, group.mapping);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(mapping, dfltAffinity);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionMapping.java
index 8bda3a90711..8a9c641812b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionMapping.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
import java.util.HashMap;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.binary.BinaryRawWriter;
@@ -32,10 +33,9 @@ public class ClientCachePartitionMapping {
private final HashMap<UUID, Set<Integer>> partitionMap;
/**
- * @param cacheId Cache ID.
* @param assignment Affinity assignment.
*/
- public ClientCachePartitionMapping(int cacheId, AffinityAssignment assignment) {
+ public ClientCachePartitionMapping(AffinityAssignment assignment) {
Set<ClusterNode> nodes = assignment.primaryPartitionNodes();
partitionMap = new HashMap<>(nodes.size());
@@ -67,12 +67,21 @@ public class ClientCachePartitionMapping {
}
}
- /**
- * Check if the mapping is compatible to another one.
- * @param another Another mapping.
- * @return True if compatible.
- */
- public boolean isCompatible(ClientCachePartitionMapping another) {
- return partitionMap.equals(another.partitionMap);
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ClientCachePartitionMapping mapping = (ClientCachePartitionMapping)o;
+
+ return Objects.equals(partitionMap, mapping.partitionMap);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(partitionMap);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java
index 00f5ea73425..062d0ebf528 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsRequest.java
@@ -18,41 +18,60 @@
package org.apache.ignite.internal.processors.platform.client.cache;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
-import org.apache.ignite.internal.processors.cache.CacheDefaultBinaryAffinityKeyMapper;
+import org.apache.ignite.internal.processors.cache.CacheType;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.platform.client.ClientAffinityTopologyVersion;
+import org.apache.ignite.internal.processors.platform.client.ClientBitmaskFeature;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientProtocolContext;
import org.apache.ignite.internal.processors.platform.client.ClientRequest;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.util.lang.gridfunc.NotContainsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
+import static java.util.Optional.ofNullable;
+import static org.apache.ignite.internal.processors.query.QueryUtils.isCustomAffinityMapper;
/**
* Cluster node list request.
- * Currently used to request list of nodes, to calculate affinity on the client side.
+ * Currently, used to request list of nodes, to calculate affinity on the client side.
*/
public class ClientCachePartitionsRequest extends ClientRequest {
/** IDs of caches. */
private final int[] cacheIds;
+ /**
+ * {@code true} if a custom mapping factory is set on the client side and mappings for all requested caches
+ * need to be sent back to the client even if they are related to a custom affinity function.
+ */
+ private final boolean withCustomMappings;
+
/**
* Initializes a new instance of ClientRawRequest class.
* @param reader Reader.
*/
- public ClientCachePartitionsRequest(BinaryRawReader reader) {
+ public ClientCachePartitionsRequest(BinaryRawReader reader, ClientProtocolContext protocolCtx) {
super(reader);
+ if (protocolCtx.isFeatureSupported(ClientBitmaskFeature.ALL_AFFINITY_MAPPINGS))
+ withCustomMappings = reader.readBoolean();
+ else
+ withCustomMappings = false;
+
int len = reader.readInt();
cacheIds = new int[len];
@@ -63,111 +82,78 @@ public class ClientCachePartitionsRequest extends ClientRequest {
/** {@inheritDoc} */
@Override public ClientResponse process(ClientConnectionContext ctx) {
- ArrayList<ClientCachePartitionAwarenessGroup> groups = new ArrayList<>(cacheIds.length);
- HashMap<Integer, ClientCachePartitionAwarenessGroup> cacheGroupIds = new HashMap<>(cacheIds.length);
-
+ Map<ClientCachePartitionAwarenessGroup, ClientCachePartitionAwarenessGroup> grps = new HashMap<>(cacheIds.length);
ClientAffinityTopologyVersion affinityVer = ctx.checkAffinityTopologyVersion();
- // As a fisrt step, get a set of mappings that we need to return.
- // To do that, check if any of the caches listed in request can be grouped.
- for (int cacheId : cacheIds) {
- DynamicCacheDescriptor cacheDesc = ctx.kernalContext().cache().cacheDescriptor(cacheId);
+ Set<Integer> affectedGroupIds = Arrays.stream(cacheIds)
+ .mapToObj(id -> ctx.kernalContext().cache().cacheDescriptor(id))
+ .filter(Objects::nonNull)
+ .map(DynamicCacheDescriptor::groupId)
+ .collect(Collectors.toSet());
- // Just ignoring, if the cache is absent - i.e. was deleted concurrently.
- if (cacheDesc == null)
- continue;
+ Map<Integer, List<DynamicCacheDescriptor>> allCaches = ctx.kernalContext().cache().cacheDescriptors().values()
+ .stream()
+ .filter(Objects::nonNull)
+ .filter(c -> c.cacheType() == CacheType.USER || c.cacheType() == CacheType.DATA_STRUCTURES)
+ .collect(Collectors.groupingBy(DynamicCacheDescriptor::groupId));
- ClientCachePartitionAwarenessGroup grp = processCache(ctx, groups, cacheGroupIds, affinityVer, cacheDesc);
+ // As a first step, get a set of mappings that we need to return.
+ // To do that, check if any of the caches listed in request can be grouped.
+ for (List<DynamicCacheDescriptor> affected : F.view(allCaches, affectedGroupIds::contains).values()) {
+ ClientCachePartitionAwarenessGroup grp = processCache(ctx, affinityVer, F.first(affected), withCustomMappings);
- // Cache already processed.
if (grp == null)
continue;
- groups.add(grp);
- cacheGroupIds.put(cacheDesc.groupId(), grp);
+ ofNullable(grps.putIfAbsent(grp, grp))
+ .orElse(grp)
+ .addAll(affected);
}
- Map<String, DynamicCacheDescriptor> allCaches = ctx.kernalContext().cache().cacheDescriptors();
-
// As a second step, check all other caches and add them to groups they are compatible with.
- for (DynamicCacheDescriptor cacheDesc: allCaches.values()) {
- // Ignoring system caches
- if (!cacheDesc.cacheType().userCache())
+ for (List<DynamicCacheDescriptor> descs : F.view(allCaches, new NotContainsPredicate<>(affectedGroupIds)).values()) {
+ ClientCachePartitionAwarenessGroup grp = processCache(ctx, affinityVer, F.first(descs), withCustomMappings);
+
+ if (grp == null)
continue;
- processCache(ctx, groups, cacheGroupIds, affinityVer, cacheDesc);
+ ClientCachePartitionAwarenessGroup grp0 = grps.get(grp);
+
+ if (grp0 != null)
+ grp0.addAll(descs);
}
- return new ClientCachePartitionsResponse(requestId(), groups, affinityVer);
+ return new ClientCachePartitionsResponse(requestId(), new ArrayList<>(grps.keySet()), affinityVer);
}
/**
* Process cache and create new partition mapping, if it does not belong to any existent.
* @param ctx Connection context.
- * @param groups Cache affinity groups.
- * @param cacheGroupIds Map of known group IDs.
* @param affinityVer Affinity topology version.
* @param cacheDesc Cache descriptor.
+ * @param withCustomMappings {@code true} to verify a non-default affinity function also.
* @return Null if cache was processed and new client cache partition awareness group if it does not belong to any
* existent.
*/
private static ClientCachePartitionAwarenessGroup processCache(
ClientConnectionContext ctx,
- List<ClientCachePartitionAwarenessGroup> groups,
- Map<Integer, ClientCachePartitionAwarenessGroup> cacheGroupIds,
ClientAffinityTopologyVersion affinityVer,
- DynamicCacheDescriptor cacheDesc
+ DynamicCacheDescriptor cacheDesc,
+ boolean withCustomMappings
) {
- int cacheGroupId = cacheDesc.groupId();
- int cacheId = cacheDesc.cacheId();
-
- ClientCachePartitionAwarenessGroup group = cacheGroupIds.get(cacheGroupId);
- if (group != null) {
- // Cache group is found. It means that cache belongs to one of cache groups with known mapping.
- // Just adding our cache to this group here.
- group.addCache(cacheDesc);
-
- return null;
- }
-
- AffinityAssignment assignment = getCacheAssignment(ctx, affinityVer, cacheId);
+ AffinityAssignment assignment = getCacheAssignment(ctx, affinityVer, cacheDesc.cacheId());
// If assignment is not available for the cache for required affinity version, ignore the cache.
if (assignment == null)
return null;
ClientCachePartitionMapping mapping = null;
- if (isApplicable(cacheDesc.cacheConfiguration()))
- mapping = new ClientCachePartitionMapping(cacheId, assignment);
-
- group = getCompatibleGroup(groups, mapping);
- if (group != null) {
- group.addCache(cacheDesc);
- cacheGroupIds.put(cacheGroupId, group);
-
- return null;
- }
-
- CacheObjectBinaryProcessorImpl proc = (CacheObjectBinaryProcessorImpl)ctx.kernalContext().cacheObjects();
- return new ClientCachePartitionAwarenessGroup(proc, mapping, cacheDesc);
- }
-
- /**
- * Get cache partition awareness group which is compatible with the mapping.
- * @param groups Group list.
- * @param mapping Partition mapping.
- * @return Compatible cache partition awareness group if present, or null.
- */
- @Nullable private static ClientCachePartitionAwarenessGroup getCompatibleGroup(
- List<ClientCachePartitionAwarenessGroup> groups,
- ClientCachePartitionMapping mapping) {
- for (ClientCachePartitionAwarenessGroup group : groups) {
- if (group.isCompatible(mapping))
- return group;
- }
+ if (isApplicable(cacheDesc.cacheConfiguration(), withCustomMappings))
+ mapping = new ClientCachePartitionMapping(assignment);
- return null;
+ return new ClientCachePartitionAwarenessGroup(mapping,
+ !withCustomMappings || isDefaultMapping(cacheDesc.cacheConfiguration()));
}
/**
@@ -190,28 +176,36 @@ public class ClientCachePartitionsRequest extends ClientRequest {
/**
* @param ccfg Cache configuration.
+ * @param withCustomMappings {@code true} to verify a non-default affinity function also.
* @return True if cache is applicable for partition awareness optimisation.
*/
- private static boolean isApplicable(CacheConfiguration ccfg) {
+ private static boolean isApplicable(CacheConfiguration<?, ?> ccfg, boolean withCustomMappings) {
// Partition could be extracted only from PARTITIONED caches.
if (ccfg.getCacheMode() != CacheMode.PARTITIONED)
return false;
- // Only caches with no custom affinity key mapper is supported.
- AffinityKeyMapper keyMapper = ccfg.getAffinityMapper();
- if (!(keyMapper instanceof CacheDefaultBinaryAffinityKeyMapper))
- return false;
-
- // Only RendezvousAffinityFunction is supported for now.
- if (!ccfg.getAffinity().getClass().equals(RendezvousAffinityFunction.class))
- return false;
-
- IgnitePredicate filter = ccfg.getNodeFilter();
+ IgnitePredicate<?> filter = ccfg.getNodeFilter();
boolean hasNodeFilter = filter != null && !(filter instanceof CacheConfiguration.IgniteAllNodesPredicate);
// We cannot be sure that two caches are co-located if custom node filter is present.
// Note that technically we may try to compare two filters. However, this adds unnecessary complexity
// and potential deserialization issues.
- return !hasNodeFilter;
+ if (hasNodeFilter)
+ return false;
+
+ return withCustomMappings || isDefaultMapping(ccfg);
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @return {@code true} if the default affinity was used for cache.
+ */
+ public static boolean isDefaultMapping(CacheConfiguration<?, ?> ccfg) {
+ // Only caches with no custom affinity key mapper is supported.
+ if (isCustomAffinityMapper(ccfg.getAffinityMapper()))
+ return false;
+
+ // Only RendezvousAffinityFunction is supported for now.
+ return ccfg.getAffinity().getClass().equals(RendezvousAffinityFunction.class);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsResponse.java
index ca487728050..747d40009af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePartitionsResponse.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.platform.client.cache;
import java.util.ArrayList;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.platform.client.ClientAffinityTopologyVersion;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -52,12 +53,13 @@ class ClientCachePartitionsResponse extends ClientResponse {
@Override public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer) {
encode(ctx, writer, affinityVer);
+ CacheObjectBinaryProcessorImpl proc = (CacheObjectBinaryProcessorImpl)ctx.kernalContext().cacheObjects();
+
affinityVer.write(writer);
writer.writeInt(mappings.size());
- for (ClientCachePartitionAwarenessGroup mapping : mappings) {
- mapping.write(writer);
- }
+ for (ClientCachePartitionAwarenessGroup mapping : mappings)
+ mapping.write(proc, writer, ctx.currentProtocolContext());
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
index 2909c4e9d60..279d31865cf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
@@ -18,7 +18,14 @@
package org.apache.ignite.internal.client.thin;
import java.lang.management.ThreadInfo;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientPartitionAwarenessMapper;
+import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
@@ -55,6 +62,79 @@ public class ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA
assertTrue(GridTestUtils.waitForCondition(() -> threadsCount(THREAD_PREFIX) == 0, 1_000L));
}
+ /**
+ * @throws Exception If fails.
+ */
+ @Test
+ public void testResourcesReleasedAfterCacheDestroyed() throws Exception {
+ int cacheId = CU.cacheId(PART_CUSTOM_AFFINITY_CACHE_NAME);
+ startGrids(2);
+
+ initClient(getClientConfiguration(0, 1)
+ .setPartitionAwarenessMapperFactory(new ClientPartitionAwarenessMapperFactory() {
+ /** {@inheritDoc} */
+ @Override public ClientPartitionAwarenessMapper create(String cacheName, int partitions) {
+ assertEquals(cacheName, PART_CUSTOM_AFFINITY_CACHE_NAME);
+
+ AffinityFunction aff = new RendezvousAffinityFunction(false, partitions);
+
+ return aff::partition;
+ }
+ }), 0, 1);
+
+ ClientCache<Object, Object> clientCache = client.cache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+ IgniteInternalCache<Object, Object> gridCache = grid(0).context().cache().cache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+
+ clientCache.put(0, 0);
+ TestTcpClientChannel opCh = affinityChannel(0, gridCache);
+
+ assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+ assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
+
+ for (int i = 1; i < KEY_CNT; i++)
+ clientCache.put(i, i);
+
+ ClientCacheAffinityContext affCtx = ((TcpIgniteClient)client).reliableChannel().affinityContext();
+ AffinityTopologyVersion ver = affCtx.currentMapping().topologyVersion();
+
+ grid(0).destroyCache(PART_CUSTOM_AFFINITY_CACHE_NAME);
+ awaitPartitionMapExchange();
+
+ // Cache destroyed, but mappings still exist on the client side.
+ assertEquals(opCh.serverNodeId(), affCtx.affinityNode(cacheId, Integer.valueOf(0)));
+
+ client.cache(PART_CACHE_NAME).put(1, 1);
+
+ // await mappings updated.
+ assertTrue(GridTestUtils.waitForCondition(() -> {
+ ClientCacheAffinityMapping m = affCtx.currentMapping();
+
+ if (m == null)
+ return false;
+
+ return m.topologyVersion().equals(ver.nextMinorVersion());
+ }, 5_000L));
+
+ // Mapping for previous caches become outdated and will be updated on the next request.
+ assertNull(affCtx.currentMapping().affinityNode(cacheId, 0));
+
+ // Trigger the next affinity mappings update. The outdated cache with custom affinity was added
+ // to pending caches list and will be processed and cleared.
+ client.cache(REPL_CACHE_NAME).put(2, 2);
+
+ assertTrue(GridTestUtils.waitForCondition(affCtx.cacheKeyMapperFactoryMap::isEmpty, 5000L));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ if (client != null)
+ client.close();
+ }
+
/**
* Gets threads count with a given name.
*/
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
index 7f79555d9d9..5556da02ee5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
@@ -18,16 +18,19 @@
package org.apache.ignite.internal.client.thin;
import java.util.function.Function;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.client.ClientAtomicConfiguration;
import org.apache.ignite.client.ClientAtomicLong;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientCollectionConfiguration;
import org.apache.ignite.client.ClientIgniteSet;
+import org.apache.ignite.client.ClientPartitionAwarenessMapper;
+import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicLongEx;
@@ -73,6 +76,29 @@ public class ThinClientPartitionAwarenessStableTopologyTest extends ThinClientAb
testNotApplicableCache(PART_CUSTOM_AFFINITY_CACHE_NAME);
}
+ /**
+ * Test that partition awareness is applicable for partitioned cache with custom affinity function
+ * and key to partition mapping function is set on the client side.
+ */
+ @Test
+ public void testPartitionedCustomAffinityCacheWithMapper() throws Exception {
+ client.close();
+
+ initClient(getClientConfiguration(1, 2, 3)
+ .setPartitionAwarenessMapperFactory(new ClientPartitionAwarenessMapperFactory() {
+ /** {@inheritDoc} */
+ @Override public ClientPartitionAwarenessMapper create(String cacheName, int partitions) {
+ assertEquals(cacheName, PART_CUSTOM_AFFINITY_CACHE_NAME);
+
+ AffinityFunction aff = new RendezvousAffinityFunction(false, partitions);
+
+ return aff::partition;
+ }
+ }), 1, 2);
+
+ testApplicableCache(PART_CUSTOM_AFFINITY_CACHE_NAME, i -> i);
+ }
+
/**
* Test partition awareness for all applicable operation types for partitioned cache with primitive key.
*/