You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2021/03/31 12:49:36 UTC

[ignite-python-thin-client] branch master updated: IGNITE-14444 Move affinity mapping storage and best node calculation to clients

This is an automated email from the ASF dual-hosted git repository.

isapego pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-python-thin-client.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cbfe32  IGNITE-14444 Move affinity mapping storage and best node calculation to clients
7cbfe32 is described below

commit 7cbfe324eb2fb16335b51191cbed9b0e1dc8c88d
Author: Ivan Dashchinskiy <iv...@gmail.com>
AuthorDate: Wed Mar 31 15:49:07 2021 +0300

    IGNITE-14444 Move affinity mapping storage and best node calculation to clients
    
    This closes #26
---
 pyignite/aio_cache.py                           | 183 ++++--------------
 pyignite/aio_client.py                          |  92 ++++++++-
 pyignite/cache.py                               | 240 ++++++------------------
 pyignite/client.py                              | 161 +++++++++++++++-
 tests/affinity/test_affinity.py                 |   4 +-
 tests/affinity/test_affinity_request_routing.py | 136 +++++++++++++-
 tests/common/test_cache_class.py                |   4 +-
 tests/common/test_cache_size.py                 |   8 +-
 tests/util.py                                   |   1 -
 9 files changed, 476 insertions(+), 353 deletions(-)

diff --git a/pyignite/aio_cache.py b/pyignite/aio_cache.py
index a2af0a7..24d4bce 100644
--- a/pyignite/aio_cache.py
+++ b/pyignite/aio_cache.py
@@ -13,15 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import asyncio
-from typing import Any, Dict, Iterable, Optional, Union
+from typing import Any, Iterable, Optional, Union
 
-from .constants import AFFINITY_RETRIES, AFFINITY_DELAY
-from .connection import AioConnection
-from .datatypes import prop_codes
-from .datatypes.base import IgniteDataType
 from .datatypes.internal import AnyDataObject
-from .exceptions import CacheCreationError, CacheError, ParameterError, connection_errors
-from .utils import cache_id, status_to_exception
+from .exceptions import CacheCreationError, CacheError, ParameterError
+from .utils import status_to_exception
 from .api.cache_config import (
     cache_create_async, cache_get_or_create_async, cache_destroy_async, cache_get_configuration_async,
     cache_create_with_config_async, cache_get_or_create_with_config_async
@@ -34,8 +30,7 @@ from .api.key_value import (
     cache_remove_if_equals_async, cache_replace_if_equals_async, cache_get_size_async,
 )
 from .cursors import AioScanCursor
-from .api.affinity import cache_get_node_partitions_async
-from .cache import __parse_settings, BaseCacheMixin
+from .cache import __parse_settings, BaseCache
 
 
 async def get_cache(client: 'AioClient', settings: Union[str, dict]) -> 'AioCache':
@@ -76,13 +71,13 @@ async def get_or_create_cache(client: 'AioClient', settings: Union[str, dict]) -
     return AioCache(client, name)
 
 
-class AioCache(BaseCacheMixin):
+class AioCache(BaseCache):
     """
     Ignite cache abstraction. Users should never use this class directly,
     but construct its instances with
-    :py:meth:`~pyignite.client.Client.create_cache`,
-    :py:meth:`~pyignite.client.Client.get_or_create_cache` or
-    :py:meth:`~pyignite.client.Client.get_cache` methods instead. See
+    :py:meth:`~pyignite.aio_client.AioClient.create_cache`,
+    :py:meth:`~pyignite.aio_client.AioClient.get_or_create_cache` or
+    :py:meth:`~pyignite.aio_client.AioClient.get_cache` methods instead. See
     :ref:`this example <create_cache>` on how to do it.
     """
     def __init__(self, client: 'AioClient', name: str):
@@ -92,12 +87,10 @@ class AioCache(BaseCacheMixin):
         :param client: Async Ignite client,
         :param name: Cache name.
         """
-        self._client = client
-        self._name = name
-        self._cache_id = cache_id(self._name)
-        self._settings = None
-        self._affinity_query_mux = asyncio.Lock()
-        self.affinity = {'version': (0, 0)}
+        super().__init__(client, name)
+
+    async def _get_best_node(self, key=None, key_hint=None):
+        return await self.client.get_best_node(self._cache_id, key, key_hint)
 
     async def settings(self) -> Optional[dict]:
         """
@@ -109,7 +102,7 @@ class AioCache(BaseCacheMixin):
         :return: dict of cache properties and their values.
         """
         if self._settings is None:
-            conn = await self.get_best_node()
+            conn = await self._get_best_node()
             config_result = await cache_get_configuration_async(conn, self._cache_id)
 
             if config_result.status == 0:
@@ -119,121 +112,15 @@ class AioCache(BaseCacheMixin):
 
         return self._settings
 
-    async def name(self) -> str:
-        """
-        Lazy cache name.
-
-        :return: cache name string.
-        """
-        if self._name is None:
-            settings = await self.settings()
-            self._name = settings[prop_codes.PROP_NAME]
-
-        return self._name
-
-    @property
-    def client(self) -> 'AioClient':
-        """
-        Ignite :class:`~pyignite.aio_client.AioClient` object.
-
-        :return: Async client object, through which the cache is accessed.
-        """
-        return self._client
-
-    @property
-    def cache_id(self) -> int:
-        """
-        Cache ID.
-
-        :return: integer value of the cache ID.
-        """
-        return self._cache_id
-
     @status_to_exception(CacheError)
     async def destroy(self):
         """
         Destroys cache with a given name.
         """
-        conn = await self.get_best_node()
+        conn = await self._get_best_node()
         return await cache_destroy_async(conn, self._cache_id)
 
     @status_to_exception(CacheError)
-    async def _get_affinity(self, conn: 'AioConnection') -> Dict:
-        """
-        Queries server for affinity mappings. Retries in case
-        of an intermittent error (most probably “Getting affinity for topology
-        version earlier than affinity is calculated”).
-
-        :param conn: connection to Igneite server,
-        :return: OP_CACHE_PARTITIONS operation result value.
-        """
-        for _ in range(AFFINITY_RETRIES or 1):
-            result = await cache_get_node_partitions_async(conn, self._cache_id)
-            if result.status == 0 and result.value['partition_mapping']:
-                break
-            await asyncio.sleep(AFFINITY_DELAY)
-
-        return result
-
-    async def get_best_node(self, key: Any = None, key_hint: 'IgniteDataType' = None) -> 'AioConnection':
-        """
-        Returns the node from the list of the nodes, opened by client, that
-        most probably contains the needed key-value pair. See IEP-23.
-
-        This method is not a part of the public API. Unless you wish to
-        extend the `pyignite` capabilities (with additional testing, logging,
-        examining connections, et c.) you probably should not use it.
-
-        :param key: (optional) pythonic key,
-        :param key_hint: (optional) Ignite data type, for which the given key
-         should be converted,
-        :return: Ignite connection object.
-        """
-        conn = await self._client.random_node()
-
-        if self.client.partition_aware and key is not None:
-            if self.__should_update_mapping():
-                async with self._affinity_query_mux:
-                    while self.__should_update_mapping():
-                        try:
-                            full_affinity = await self._get_affinity(conn)
-                            self._update_affinity(full_affinity)
-
-                            asyncio.ensure_future(
-                                asyncio.gather(
-                                    *[conn.reconnect() for conn in self.client._nodes if not conn.alive],
-                                    return_exceptions=True
-                                )
-                            )
-
-                            break
-                        except connection_errors:
-                            # retry if connection failed
-                            conn = await self._client.random_node()
-                            pass
-                        except CacheError:
-                            # server did not create mapping in time
-                            return conn
-
-            parts = self.affinity.get('number_of_partitions')
-
-            if not parts:
-                return conn
-
-            key, key_hint = self._get_affinity_key(key, key_hint)
-
-            hashcode = await key_hint.hashcode_async(key, self._client)
-
-            best_node = self._get_node_by_hashcode(hashcode, parts)
-            if best_node:
-                return best_node
-
-        return conn
-
-    def __should_update_mapping(self):
-        return self.affinity['version'] < self._client.affinity_version
-
-    @status_to_exception(CacheError)
     async def get(self, key, key_hint: object = None) -> Any:
         """
         Retrieves a value from cache by key.
@@ -246,7 +133,7 @@ class AioCache(BaseCacheMixin):
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
 
-        conn = await self.get_best_node(key, key_hint)
+        conn = await self._get_best_node(key, key_hint)
         result = await cache_get_async(conn, self._cache_id, key, key_hint=key_hint)
         result.value = await self.client.unwrap_binary(result.value)
         return result
@@ -267,7 +154,7 @@ class AioCache(BaseCacheMixin):
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
 
-        conn = await self.get_best_node(key, key_hint)
+        conn = await self._get_best_node(key, key_hint)
         return await cache_put_async(conn, self._cache_id, key, value, key_hint=key_hint, value_hint=value_hint)
 
     @status_to_exception(CacheError)
@@ -278,7 +165,7 @@ class AioCache(BaseCacheMixin):
         :param keys: list of keys or tuples of (key, key_hint),
         :return: a dict of key-value pairs.
         """
-        conn = await self.get_best_node()
+        conn = await self._get_best_node()
         result = await cache_get_all_async(conn, self._cache_id, keys)
         if result.value:
             keys = list(result.value.keys())
@@ -298,7 +185,7 @@ class AioCache(BaseCacheMixin):
          to save. Each key or value can be an item of representable
          Python type or a tuple of (item, hint),
         """
-        conn = await self.get_best_node()
+        conn = await self._get_best_node()
         return await cache_put_all_async(conn, self._cache_id, pairs)
 
     @status_to_exception(CacheError)
@@ -316,7 +203,7 @@ class AioCache(BaseCacheMixin):
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
 
-        conn = await self.get_best_node(key, key_hint)
+        conn = await self._get_best_node(key, key_hint)
         result = await cache_replace_async(conn, self._cache_id, key, value, key_hint=key_hint, value_hint=value_hint)
         result.value = await self.client.unwrap_binary(result.value)
         return result
@@ -329,7 +216,7 @@ class AioCache(BaseCacheMixin):
         :param keys: (optional) list of cache keys or (key, key type
          hint) tuples to clear (default: clear all).
         """
-        conn = await self.get_best_node()
+        conn = await self._get_best_node()
         if keys:
             return await cache_clear_keys_async(conn, self._cache_id, keys)
         else:
@@ -347,7 +234,7 @@ class AioCache(BaseCacheMixin):
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
 
-        conn = await self.get_best_node(key, key_hint)
+        conn = await self._get_best_node(key, key_hint)
         return await cache_clear_key_async(conn, self._cache_id, key, key_hint=key_hint)
 
     @status_to_exception(CacheError)
@@ -357,7 +244,7 @@ class AioCache(BaseCacheMixin):
 
         :param keys: a list of keys or (key, type hint) tuples
         """
-        conn = await self.get_best_node()
+        conn = await self._get_best_node()
         return await cache_clear_keys_async(conn, self._cache_id, keys)
 
     @status_to_exception(CacheError)
@@ -373,7 +260,7 @@ class AioCache(BaseCacheMixin):
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
 
-        conn = await self.get_best_node(key, key_hint)
+        conn = await self._get_best_node(key, key_hint)
         return await cache_contains_key_async(conn, self._cache_id, key, key_hint=key_hint)
 
     @status_to_exception(CacheError)
@@ -384,7 +271,7 @@ class AioCache(BaseCacheMixin):
         :param keys: a list of keys or (key, type hint) tuples,
         :return: boolean `True` when all keys are present, `False` otherwise.
         """
-        conn = await self.get_best_node()
+        conn = await self._get_best_node()
         return await cache_contains_keys_async(conn, self._cache_id, keys)
 
     @status_to_exception(CacheError)
@@ -404,7 +291,7 @@ class AioCache(BaseCacheMixin):
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
 
-        conn = await self.get_best_node(key, key_hint)
+        conn = await self._get_best_node(key, key_hint)
         result = await cache_get_and_put_async(conn, self._cache_id, key, value, key_hint, value_hint)
 
         result.value = await self.client.unwrap_binary(result.value)
@@ -427,7 +314,7 @@ class AioCache(BaseCacheMixin):
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
 
-        conn = await self.get_best_node(key, key_hint)
+        conn = await self._get_best_node(key, key_hint)
         result = await cache_get_and_put_if_absent_async(conn, self._cache_id, key, value, key_hint, value_hint)
         result.value = await self.client.unwrap_binary(result.value)
         return result
@@ -448,7 +335,7 @@ class AioCache(BaseCacheMixin):
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
 
-        conn = await self.get_best_node(key, key_hint)
+        conn = await self._get_best_node(key, key_hint)
         return await cache_put_if_absent_async(conn, self._cache_id, key, value, key_hint, value_hint)
 
     @status_to_exception(CacheError)
@@ -464,7 +351,7 @@ class AioCache(BaseCacheMixin):
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
 
-        conn = await self.get_best_node(key, key_hint)
+        conn = await self._get_best_node(key, key_hint)
         result = await cache_get_and_remove_async(conn, self._cache_id, key, key_hint)
         result.value = await self.client.unwrap_binary(result.value)
         return result
@@ -487,7 +374,7 @@ class AioCache(BaseCacheMixin):
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
 
-        conn = await self.get_best_node(key, key_hint)
+        conn = await self._get_best_node(key, key_hint)
         result = await cache_get_and_replace_async(conn, self._cache_id, key, value, key_hint, value_hint)
         result.value = await self.client.unwrap_binary(result.value)
         return result
@@ -504,7 +391,7 @@ class AioCache(BaseCacheMixin):
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
 
-        conn = await self.get_best_node(key, key_hint)
+        conn = await self._get_best_node(key, key_hint)
         return await cache_remove_key_async(conn, self._cache_id, key, key_hint)
 
     @status_to_exception(CacheError)
@@ -515,7 +402,7 @@ class AioCache(BaseCacheMixin):
 
         :param keys: list of keys or tuples of (key, key_hint) to remove.
         """
-        conn = await self.get_best_node()
+        conn = await self._get_best_node()
         return await cache_remove_keys_async(conn, self._cache_id, keys)
 
     @status_to_exception(CacheError)
@@ -523,7 +410,7 @@ class AioCache(BaseCacheMixin):
         """
         Removes all cache entries, notifying listeners and cache writers.
         """
-        conn = await self.get_best_node()
+        conn = await self._get_best_node()
         return await cache_remove_all_async(conn, self._cache_id)
 
     @status_to_exception(CacheError)
@@ -542,7 +429,7 @@ class AioCache(BaseCacheMixin):
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
 
-        conn = await self.get_best_node(key, key_hint)
+        conn = await self._get_best_node(key, key_hint)
         return await cache_remove_if_equals_async(conn, self._cache_id, key, sample, key_hint, sample_hint)
 
     @status_to_exception(CacheError)
@@ -565,7 +452,7 @@ class AioCache(BaseCacheMixin):
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
 
-        conn = await self.get_best_node(key, key_hint)
+        conn = await self._get_best_node(key, key_hint)
         result = await cache_replace_if_equals_async(conn, self._cache_id, key, sample, value, key_hint, sample_hint,
                                                      value_hint)
         result.value = await self.client.unwrap_binary(result.value)
@@ -581,7 +468,7 @@ class AioCache(BaseCacheMixin):
          (PeekModes.BACKUP). Defaults to primary cache partitions (PeekModes.PRIMARY),
         :return: integer number of cache entries.
         """
-        conn = await self.get_best_node()
+        conn = await self._get_best_node()
         return await cache_get_size_async(conn, self._cache_id, peek_modes)
 
     def scan(self, page_size: int = 1, partitions: int = -1, local: bool = False):
diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py
index d2cc3ff..5e64450 100644
--- a/pyignite/aio_client.py
+++ b/pyignite/aio_client.py
@@ -15,19 +15,21 @@
 import asyncio
 import random
 from itertools import chain
-from typing import Iterable, Type, Union, Any
+from typing import Iterable, Type, Union, Any, Dict
 
+from .api import cache_get_node_partitions_async
 from .api.binary import get_binary_type_async, put_binary_type_async
 from .api.cache_config import cache_get_names_async
+from .cache import BaseCache
 from .client import BaseClient
 from .cursors import AioSqlFieldsCursor
 from .aio_cache import AioCache, get_cache, create_cache, get_or_create_cache
 from .connection import AioConnection
-from .constants import IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT
+from .constants import AFFINITY_RETRIES, AFFINITY_DELAY
 from .datatypes import BinaryObject
 from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors
 from .stream import AioBinaryStream, READ_BACKWARD
-from .utils import cache_id, entity_id, status_to_exception, is_iterable, is_wrapped
+from .utils import cache_id, entity_id, status_to_exception, is_wrapped
 
 
 __all__ = ['AioClient']
@@ -72,6 +74,7 @@ class AioClient(BaseClient):
         """
         super().__init__(compact_footer, partition_aware, **kwargs)
         self._registry_mux = asyncio.Lock()
+        self._affinity_query_mux = asyncio.Lock()
 
     def connect(self, *args):
         """
@@ -271,6 +274,89 @@ class AioClient(BaseClient):
                 return await BinaryObject.to_python_async(stream.read_ctype(data_class, direction=READ_BACKWARD), self)
         return value
 
+    @status_to_exception(CacheError)
+    async def _get_affinity(self, conn: 'AioConnection', caches: Iterable[int]) -> Dict:
+        """
+        Queries server for affinity mappings. Retries in case
+        of an intermittent error (most probably “Getting affinity for topology
+        version earlier than affinity is calculated”).
+
+        :param conn: connection to Igneite server,
+        :param caches: Ids of caches,
+        :return: OP_CACHE_PARTITIONS operation result value.
+        """
+        for _ in range(AFFINITY_RETRIES or 1):
+            result = await cache_get_node_partitions_async(conn, caches)
+            if result.status == 0 and result.value['partition_mapping']:
+                break
+            await asyncio.sleep(AFFINITY_DELAY)
+
+        return result
+
+    async def get_best_node(
+            self, cache: Union[int, str, 'BaseCache'], key: Any = None, key_hint: 'IgniteDataType' = None
+    ) -> 'AioConnection':
+        """
+        Returns the node from the list of the nodes, opened by client, that
+        most probably contains the needed key-value pair. See IEP-23.
+
+        This method is not a part of the public API. Unless you wish to
+        extend the `pyignite` capabilities (with additional testing, logging,
+        examining connections, et c.) you probably should not use it.
+
+        :param cache: Ignite cache, cache name or cache id,
+        :param key: (optional) pythonic key,
+        :param key_hint: (optional) Ignite data type, for which the given key
+         should be converted,
+        :return: Ignite connection object.
+        """
+        conn = await self.random_node()
+
+        if self.partition_aware and key is not None:
+            caches = self._caches_to_update_affinity()
+            if caches:
+                async with self._affinity_query_mux:
+                    while True:
+                        caches = self._caches_to_update_affinity()
+                        if not caches:
+                            break
+
+                        try:
+                            full_affinity = await self._get_affinity(conn, caches)
+                            self._update_affinity(full_affinity)
+
+                            asyncio.ensure_future(
+                                asyncio.gather(
+                                    *[conn.reconnect() for conn in self._nodes if not conn.alive],
+                                    return_exceptions=True
+                                )
+                            )
+
+                            break
+                        except connection_errors:
+                            # retry if connection failed
+                            conn = await self.random_node()
+                            pass
+                        except CacheError:
+                            # server did not create mapping in time
+                            return conn
+
+            c_id = cache.cache_id if isinstance(cache, BaseCache) else cache_id(cache)
+            parts = self._cache_partition_mapping(c_id).get('number_of_partitions')
+
+            if not parts:
+                return conn
+
+            key, key_hint = self._get_affinity_key(c_id, key, key_hint)
+
+            hashcode = await key_hint.hashcode_async(key, self)
+
+            best_node = self._get_node_by_hashcode(c_id, hashcode, parts)
+            if best_node:
+                return best_node
+
+        return conn
+
     async def create_cache(self, settings: Union[str, dict]) -> 'AioCache':
         """
         Creates Ignite cache by name. Raises `CacheError` if such a cache is
diff --git a/pyignite/cache.py b/pyignite/cache.py
index 2602d1c..f00f000 100644
--- a/pyignite/cache.py
+++ b/pyignite/cache.py
@@ -13,15 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import time
-from typing import Any, Dict, Iterable, Optional, Tuple, Union
+from typing import Any, Iterable, Optional, Tuple, Union
 
-from .constants import AFFINITY_RETRIES, AFFINITY_DELAY
-from .binary import GenericObjectMeta
 from .datatypes import prop_codes
 from .datatypes.internal import AnyDataObject
-from .exceptions import CacheCreationError, CacheError, ParameterError, SQLError, connection_errors
-from .utils import cache_id, get_field_by_id, status_to_exception, unsigned
+from .exceptions import CacheCreationError, CacheError, ParameterError, SQLError
+from .utils import cache_id, status_to_exception
 from .api.cache_config import (
     cache_create, cache_create_with_config, cache_get_or_create, cache_get_or_create_with_config, cache_destroy,
     cache_get_configuration
@@ -33,7 +30,6 @@ from .api.key_value import (
     cache_remove_if_equals, cache_replace_if_equals, cache_get_size
 )
 from .cursors import ScanCursor, SqlCursor
-from .api.affinity import cache_get_node_partitions
 
 PROP_CODES = set([
     getattr(prop_codes, x)
@@ -96,65 +92,39 @@ def __parse_settings(settings: Union[str, dict]) -> Tuple[Optional[str], Optiona
         raise ParameterError('You should supply at least cache name')
 
 
-class BaseCacheMixin:
-    def _get_affinity_key(self, key, key_hint=None):
-        if key_hint is None:
-            key_hint = AnyDataObject.map_python_type(key)
-
-        if self.affinity.get('is_applicable'):
-            config = self.affinity.get('cache_config')
-            if config:
-                affinity_key_id = config.get(key_hint.type_id)
-
-                if affinity_key_id and isinstance(key, GenericObjectMeta):
-                    return get_field_by_id(key, affinity_key_id)
-
-        return key, key_hint
-
-    def _update_affinity(self, full_affinity):
-        self.affinity['version'] = full_affinity['version']
-
-        full_mapping = full_affinity.get('partition_mapping')
-        if full_mapping and self.cache_id in full_mapping:
-            self.affinity.update(full_mapping[self.cache_id])
+class BaseCache:
+    def __init__(self, client: 'BaseClient', name: str):
+        self._client = client
+        self._name = name
+        self._settings = None
+        self._cache_id = cache_id(self._name)
+        self._client.register_cache(self._cache_id)
 
-    def _get_node_by_hashcode(self, hashcode, parts):
+    @property
+    def name(self) -> str:
         """
-        Get node by key hashcode. Calculate partition and return node on that it is primary.
-        (algorithm is taken from `RendezvousAffinityFunction.java`)
+        :return: cache name string.
         """
+        return self._name
 
-        # calculate partition for key or affinity key
-        # (algorithm is taken from `RendezvousAffinityFunction.java`)
-        mask = parts - 1
-
-        if parts & mask == 0:
-            part = (hashcode ^ (unsigned(hashcode) >> 16)) & mask
-        else:
-            part = abs(hashcode // parts)
-
-        assert 0 <= part < parts, 'Partition calculation has failed'
-
-        node_mapping = self.affinity.get('node_mapping')
-        if not node_mapping:
-            return None
+    @property
+    def client(self) -> 'BaseClient':
+        """
+        :return: Client object, through which the cache is accessed.
+        """
+        return self._client
 
-        node_uuid, best_conn = None, None
-        for u, p in node_mapping.items():
-            if part in p:
-                node_uuid = u
-                break
+    @property
+    def cache_id(self) -> int:
+        """
+        Cache ID.
 
-        if node_uuid:
-            for n in self.client._nodes:
-                if n.uuid == node_uuid:
-                    best_conn = n
-                    break
-            if best_conn and best_conn.alive:
-                return best_conn
+        :return: integer value of the cache ID.
+        """
+        return self._cache_id
 
 
-class Cache(BaseCacheMixin):
+class Cache(BaseCache):
     """
     Ignite cache abstraction. Users should never use this class directly,
     but construct its instances with
@@ -171,11 +141,10 @@ class Cache(BaseCacheMixin):
         :param client: Ignite client,
         :param name: Cache name.
         """
-        self._client = client
-        self._name = name
-        self._settings = None
-        self._cache_id = cache_id(self._name)
-        self.affinity = {'version': (0, 0)}
+        super().__init__(client, name)
+
+    def _get_best_node(self, key=None, key_hint=None):
+        return self.client.get_best_node(self._cache_id, key, key_hint)
 
     @property
     def settings(self) -> Optional[dict]:
@@ -189,7 +158,7 @@ class Cache(BaseCacheMixin):
         """
         if self._settings is None:
             config_result = cache_get_configuration(
-                self.get_best_node(),
+                self._get_best_node(),
                 self._cache_id
             )
             if config_result.status == 0:
@@ -199,111 +168,12 @@ class Cache(BaseCacheMixin):
 
         return self._settings
 
-    @property
-    def name(self) -> str:
-        """
-        Lazy cache name.
-
-        :return: cache name string.
-        """
-        if self._name is None:
-            self._name = self.settings[prop_codes.PROP_NAME]
-
-        return self._name
-
-    @property
-    def client(self) -> 'Client':
-        """
-        Ignite :class:`~pyignite.client.Client` object.
-
-        :return: Client object, through which the cache is accessed.
-        """
-        return self._client
-
-    @property
-    def cache_id(self) -> int:
-        """
-        Cache ID.
-
-        :return: integer value of the cache ID.
-        """
-        return self._cache_id
-
     @status_to_exception(CacheError)
     def destroy(self):
         """
         Destroys cache with a given name.
         """
-        return cache_destroy(self.get_best_node(), self._cache_id)
-
-    @status_to_exception(CacheError)
-    def _get_affinity(self, conn: 'Connection') -> Dict:
-        """
-        Queries server for affinity mappings. Retries in case
-        of an intermittent error (most probably “Getting affinity for topology
-        version earlier than affinity is calculated”).
-
-        :param conn: connection to Igneite server,
-        :return: OP_CACHE_PARTITIONS operation result value.
-        """
-        for _ in range(AFFINITY_RETRIES or 1):
-            result = cache_get_node_partitions(conn, self._cache_id)
-            if result.status == 0 and result.value['partition_mapping']:
-                break
-            time.sleep(AFFINITY_DELAY)
-
-        return result
-
-    def get_best_node(self, key: Any = None, key_hint: 'IgniteDataType' = None) -> 'Connection':
-        """
-        Returns the node from the list of the nodes, opened by client, that
-        most probably contains the needed key-value pair. See IEP-23.
-
-        This method is not a part of the public API. Unless you wish to
-        extend the `pyignite` capabilities (with additional testing, logging,
-        examining connections, et c.) you probably should not use it.
-
-        :param key: (optional) pythonic key,
-        :param key_hint: (optional) Ignite data type, for which the given key
-         should be converted,
-        :return: Ignite connection object.
-        """
-        conn = self._client.random_node
-
-        if self.client.partition_aware and key is not None:
-            if self.affinity['version'] < self._client.affinity_version:
-                # update partition mapping
-                while True:
-                    try:
-                        full_affinity = self._get_affinity(conn)
-                        break
-                    except connection_errors:
-                        # retry if connection failed
-                        conn = self._client.random_node
-                        pass
-                    except CacheError:
-                        # server did not create mapping in time
-                        return conn
-
-                self._update_affinity(full_affinity)
-
-                for conn in self.client._nodes:
-                    if not conn.alive:
-                        conn.reconnect()
-
-            parts = self.affinity.get('number_of_partitions')
-
-            if not parts:
-                return conn
-
-            key, key_hint = self._get_affinity_key(key, key_hint)
-            hashcode = key_hint.hashcode(key, self._client)
-
-            best_node = self._get_node_by_hashcode(hashcode, parts)
-            if best_node:
-                return best_node
-
-        return conn
+        return cache_destroy(self._get_best_node(), self._cache_id)
 
     @status_to_exception(CacheError)
     def get(self, key, key_hint: object = None) -> Any:
@@ -319,7 +189,7 @@ class Cache(BaseCacheMixin):
             key_hint = AnyDataObject.map_python_type(key)
 
         result = cache_get(
-            self.get_best_node(key, key_hint),
+            self._get_best_node(key, key_hint),
             self._cache_id,
             key,
             key_hint=key_hint
@@ -346,7 +216,7 @@ class Cache(BaseCacheMixin):
             key_hint = AnyDataObject.map_python_type(key)
 
         return cache_put(
-            self.get_best_node(key, key_hint),
+            self._get_best_node(key, key_hint),
             self._cache_id, key, value,
             key_hint=key_hint, value_hint=value_hint
         )
@@ -359,7 +229,7 @@ class Cache(BaseCacheMixin):
         :param keys: list of keys or tuples of (key, key_hint),
         :return: a dict of key-value pairs.
         """
-        result = cache_get_all(self.get_best_node(), self._cache_id, keys)
+        result = cache_get_all(self._get_best_node(), self._cache_id, keys)
         if result.value:
             for key, value in result.value.items():
                 result.value[key] = self.client.unwrap_binary(value)
@@ -375,7 +245,7 @@ class Cache(BaseCacheMixin):
          to save. Each key or value can be an item of representable
          Python type or a tuple of (item, hint),
         """
-        return cache_put_all(self.get_best_node(), self._cache_id, pairs)
+        return cache_put_all(self._get_best_node(), self._cache_id, pairs)
 
     @status_to_exception(CacheError)
     def replace(
@@ -395,7 +265,7 @@ class Cache(BaseCacheMixin):
             key_hint = AnyDataObject.map_python_type(key)
 
         result = cache_replace(
-            self.get_best_node(key, key_hint),
+            self._get_best_node(key, key_hint),
             self._cache_id, key, value,
             key_hint=key_hint, value_hint=value_hint
         )
@@ -410,7 +280,7 @@ class Cache(BaseCacheMixin):
         :param keys: (optional) list of cache keys or (key, key type
          hint) tuples to clear (default: clear all).
         """
-        conn = self.get_best_node()
+        conn = self._get_best_node()
         if keys:
             return cache_clear_keys(conn, self._cache_id, keys)
         else:
@@ -429,7 +299,7 @@ class Cache(BaseCacheMixin):
             key_hint = AnyDataObject.map_python_type(key)
 
         return cache_clear_key(
-            self.get_best_node(key, key_hint),
+            self._get_best_node(key, key_hint),
             self._cache_id,
             key,
             key_hint=key_hint
@@ -443,7 +313,7 @@ class Cache(BaseCacheMixin):
         :param keys: a list of keys or (key, type hint) tuples
         """
 
-        return cache_clear_keys(self.get_best_node(), self._cache_id, keys)
+        return cache_clear_keys(self._get_best_node(), self._cache_id, keys)
 
     @status_to_exception(CacheError)
     def contains_key(self, key, key_hint=None) -> bool:
@@ -459,7 +329,7 @@ class Cache(BaseCacheMixin):
             key_hint = AnyDataObject.map_python_type(key)
 
         return cache_contains_key(
-            self.get_best_node(key, key_hint),
+            self._get_best_node(key, key_hint),
             self._cache_id,
             key,
             key_hint=key_hint
@@ -473,7 +343,7 @@ class Cache(BaseCacheMixin):
         :param keys: a list of keys or (key, type hint) tuples,
         :return: boolean `True` when all keys are present, `False` otherwise.
         """
-        return cache_contains_keys(self.get_best_node(), self._cache_id, keys)
+        return cache_contains_keys(self._get_best_node(), self._cache_id, keys)
 
     @status_to_exception(CacheError)
     def get_and_put(self, key, value, key_hint=None, value_hint=None) -> Any:
@@ -493,7 +363,7 @@ class Cache(BaseCacheMixin):
             key_hint = AnyDataObject.map_python_type(key)
 
         result = cache_get_and_put(
-            self.get_best_node(key, key_hint),
+            self._get_best_node(key, key_hint),
             self._cache_id,
             key, value,
             key_hint, value_hint
@@ -521,7 +391,7 @@ class Cache(BaseCacheMixin):
             key_hint = AnyDataObject.map_python_type(key)
 
         result = cache_get_and_put_if_absent(
-            self.get_best_node(key, key_hint),
+            self._get_best_node(key, key_hint),
             self._cache_id,
             key, value,
             key_hint, value_hint
@@ -546,7 +416,7 @@ class Cache(BaseCacheMixin):
             key_hint = AnyDataObject.map_python_type(key)
 
         return cache_put_if_absent(
-            self.get_best_node(key, key_hint),
+            self._get_best_node(key, key_hint),
             self._cache_id,
             key, value,
             key_hint, value_hint
@@ -566,7 +436,7 @@ class Cache(BaseCacheMixin):
             key_hint = AnyDataObject.map_python_type(key)
 
         result = cache_get_and_remove(
-            self.get_best_node(key, key_hint),
+            self._get_best_node(key, key_hint),
             self._cache_id,
             key,
             key_hint
@@ -595,7 +465,7 @@ class Cache(BaseCacheMixin):
             key_hint = AnyDataObject.map_python_type(key)
 
         result = cache_get_and_replace(
-            self.get_best_node(key, key_hint),
+            self._get_best_node(key, key_hint),
             self._cache_id,
             key, value,
             key_hint, value_hint
@@ -616,7 +486,7 @@ class Cache(BaseCacheMixin):
             key_hint = AnyDataObject.map_python_type(key)
 
         return cache_remove_key(
-            self.get_best_node(key, key_hint), self._cache_id, key, key_hint
+            self._get_best_node(key, key_hint), self._cache_id, key, key_hint
         )
 
     @status_to_exception(CacheError)
@@ -628,7 +498,7 @@ class Cache(BaseCacheMixin):
         :param keys: list of keys or tuples of (key, key_hint) to remove.
         """
         return cache_remove_keys(
-            self.get_best_node(), self._cache_id, keys
+            self._get_best_node(), self._cache_id, keys
         )
 
     @status_to_exception(CacheError)
@@ -636,7 +506,7 @@ class Cache(BaseCacheMixin):
         """
         Removes all cache entries, notifying listeners and cache writers.
         """
-        return cache_remove_all(self.get_best_node(), self._cache_id)
+        return cache_remove_all(self._get_best_node(), self._cache_id)
 
     @status_to_exception(CacheError)
     def remove_if_equals(self, key, sample, key_hint=None, sample_hint=None):
@@ -655,7 +525,7 @@ class Cache(BaseCacheMixin):
             key_hint = AnyDataObject.map_python_type(key)
 
         return cache_remove_if_equals(
-            self.get_best_node(key, key_hint),
+            self._get_best_node(key, key_hint),
             self._cache_id,
             key, sample,
             key_hint, sample_hint
@@ -685,7 +555,7 @@ class Cache(BaseCacheMixin):
             key_hint = AnyDataObject.map_python_type(key)
 
         result = cache_replace_if_equals(
-            self.get_best_node(key, key_hint),
+            self._get_best_node(key, key_hint),
             self._cache_id,
             key, sample, value,
             key_hint, sample_hint, value_hint
@@ -704,7 +574,7 @@ class Cache(BaseCacheMixin):
         :return: integer number of cache entries.
         """
         return cache_get_size(
-            self.get_best_node(), self._cache_id, peek_modes
+            self._get_best_node(), self._cache_id, peek_modes
         )
 
     def scan(self, page_size: int = 1, partitions: int = -1, local: bool = False):
diff --git a/pyignite/client.py b/pyignite/client.py
index 05df617..2f24c43 100644
--- a/pyignite/client.py
+++ b/pyignite/client.py
@@ -39,25 +39,28 @@ It returns a generator with result rows.
 :py:meth:`~pyignite.client.Client.query_binary_type` methods operates
 the local (class-wise) registry for Ignite Complex objects.
 """
-
+import time
 from collections import defaultdict, OrderedDict
 import random
 import re
 from itertools import chain
-from typing import Iterable, Type, Union, Any
+from typing import Iterable, Type, Union, Any, Dict
 
+from .api import cache_get_node_partitions
 from .api.binary import get_binary_type, put_binary_type
 from .api.cache_config import cache_get_names
 from .cursors import SqlFieldsCursor
-from .cache import Cache, create_cache, get_cache, get_or_create_cache
+from .cache import Cache, create_cache, get_cache, get_or_create_cache, BaseCache
 from .connection import Connection
-from .constants import IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT, PROTOCOL_BYTE_ORDER
-from .datatypes import BinaryObject
+from .constants import IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT, PROTOCOL_BYTE_ORDER, AFFINITY_RETRIES, AFFINITY_DELAY
+from .datatypes import BinaryObject, AnyDataObject
+from .datatypes.base import IgniteDataType
 from .datatypes.internal import tc_map
 from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors
 from .stream import BinaryStream, READ_BACKWARD
 from .utils import (
-    cache_id, capitalize, entity_id, schema_id, process_delimiter, status_to_exception, is_iterable, is_wrapped
+    cache_id, capitalize, entity_id, schema_id, process_delimiter, status_to_exception, is_iterable, is_wrapped,
+    get_field_by_id, unsigned
 )
 from .binary import GenericObjectMeta
 
@@ -79,6 +82,7 @@ class BaseClient:
         self._current_node = 0
         self._partition_aware = partition_aware
         self.affinity_version = (0, 0)
+        self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)}
         self._protocol_version = None
 
     @property
@@ -242,6 +246,76 @@ class BaseClient:
                 return None
         return self._registry[type_id]
 
+    def register_cache(self, cache_id: int):
+        if self.partition_aware and cache_id not in self._affinity:
+            self._affinity['partition_mapping'][cache_id] = {}
+
+    def _get_affinity_key(self, cache_id, key, key_hint=None):
+        if key_hint is None:
+            key_hint = AnyDataObject.map_python_type(key)
+
+        cache_partition_mapping = self._cache_partition_mapping(cache_id)
+        if cache_partition_mapping and cache_partition_mapping.get('is_applicable'):
+            config = cache_partition_mapping.get('cache_config')
+            if config:
+                affinity_key_id = config.get(key_hint.type_id)
+
+                if affinity_key_id and isinstance(key, GenericObjectMeta):
+                    return get_field_by_id(key, affinity_key_id)
+
+        return key, key_hint
+
+    def _update_affinity(self, full_affinity):
+        self._affinity['version'] = full_affinity['version']
+
+        full_mapping = full_affinity.get('partition_mapping')
+        if full_mapping:
+            self._affinity['partition_mapping'].update(full_mapping)
+
+    def _caches_to_update_affinity(self):
+        if self._affinity['version'] < self.affinity_version:
+            return list(self._affinity['partition_mapping'].keys())
+        else:
+            return list(c_id for c_id, c_mapping in self._affinity['partition_mapping'].items() if not c_mapping)
+
+    def _cache_partition_mapping(self, cache_id):
+        return self._affinity['partition_mapping'][cache_id]
+
+    def _get_node_by_hashcode(self, cache_id, hashcode, parts):
+        """
+        Get node by key hashcode. Calculate partition and return node on that it is primary.
+        (algorithm is taken from `RendezvousAffinityFunction.java`)
+        """
+
+        # calculate partition for key or affinity key
+        # (algorithm is taken from `RendezvousAffinityFunction.java`)
+        mask = parts - 1
+
+        if parts & mask == 0:
+            part = (hashcode ^ (unsigned(hashcode) >> 16)) & mask
+        else:
+            part = abs(hashcode // parts)
+
+        assert 0 <= part < parts, 'Partition calculation has failed'
+
+        node_mapping = self._cache_partition_mapping(cache_id).get('node_mapping')
+        if not node_mapping:
+            return None
+
+        node_uuid, best_conn = None, None
+        for u, p in node_mapping.items():
+            if part in p:
+                node_uuid = u
+                break
+
+        if node_uuid:
+            for n in self._nodes:
+                if n.uuid == node_uuid:
+                    best_conn = n
+                    break
+            if best_conn and best_conn.alive:
+                return best_conn
+
 
 class _ConnectionContextManager:
     def __init__(self, client, nodes):
@@ -476,6 +550,81 @@ class Client(BaseClient):
                 return BinaryObject.to_python(stream.read_ctype(data_class, direction=READ_BACKWARD), self)
         return value
 
+    @status_to_exception(CacheError)
+    def _get_affinity(self, conn: 'Connection', caches: Iterable[int]) -> Dict:
+        """
+        Queries server for affinity mappings. Retries in case
+        of an intermittent error (most probably “Getting affinity for topology
+        version earlier than affinity is calculated”).
+
+        :param conn: connection to Ignite server,
+        :param caches: Ids of caches,
+        :return: OP_CACHE_PARTITIONS operation result value.
+        """
+        for _ in range(AFFINITY_RETRIES or 1):
+            result = cache_get_node_partitions(conn, caches)
+            if result.status == 0 and result.value['partition_mapping']:
+                break
+            time.sleep(AFFINITY_DELAY)
+
+        return result
+
+    def get_best_node(
+            self, cache: Union[int, str, 'BaseCache'], key: Any = None, key_hint: 'IgniteDataType' = None
+    ) -> 'Connection':
+        """
+        Returns the node from the list of the nodes, opened by client, that
+        most probably contains the needed key-value pair. See IEP-23.
+
+        This method is not a part of the public API. Unless you wish to
+        extend the `pyignite` capabilities (with additional testing, logging,
+        examining connections, et c.) you probably should not use it.
+
+        :param cache: Ignite cache, cache name or cache id,
+        :param key: (optional) pythonic key,
+        :param key_hint: (optional) Ignite data type, for which the given key
+         should be converted,
+        :return: Ignite connection object.
+        """
+        conn = self.random_node
+
+        if self.partition_aware and key is not None:
+            caches = self._caches_to_update_affinity()
+            if caches:
+                # update partition mapping
+                while True:
+                    try:
+                        full_affinity = self._get_affinity(conn, caches)
+                        break
+                    except connection_errors:
+                        # retry if connection failed
+                        conn = self.random_node
+                        pass
+                    except CacheError:
+                        # server did not create mapping in time
+                        return conn
+
+                self._update_affinity(full_affinity)
+
+                for conn in self._nodes:
+                    if not conn.alive:
+                        conn.reconnect()
+
+            c_id = cache.cache_id if isinstance(cache, BaseCache) else cache_id(cache)
+            parts = self._cache_partition_mapping(c_id).get('number_of_partitions')
+
+            if not parts:
+                return conn
+
+            key, key_hint = self._get_affinity_key(c_id, key, key_hint)
+            hashcode = key_hint.hashcode(key, self)
+
+            best_node = self._get_node_by_hashcode(c_id, hashcode, parts)
+            if best_node:
+                return best_node
+
+        return conn
+
     def create_cache(self, settings: Union[str, dict]) -> 'Cache':
         """
         Creates Ignite cache by name. Raises `CacheError` if such a cache is
diff --git a/tests/affinity/test_affinity.py b/tests/affinity/test_affinity.py
index b1bcec7..64b9cc5 100644
--- a/tests/affinity/test_affinity.py
+++ b/tests/affinity/test_affinity.py
@@ -309,7 +309,7 @@ def __check_best_node_calculation(client, cache, key, value, key_hint=None):
 
     def inner():
         cache.put(key, value, key_hint=key_hint)
-        best_node = cache.get_best_node(key, key_hint=key_hint)
+        best_node = client.get_best_node(cache, key, key_hint=key_hint)
 
         for node in filter(lambda n: n.alive, client._nodes):
             result = cache_local_peek(node, cache.cache_id, key, key_hint=key_hint)
@@ -318,7 +318,7 @@ def __check_best_node_calculation(client, cache, key, value, key_hint=None):
 
     async def inner_async():
         await cache.put(key, value, key_hint=key_hint)
-        best_node = await cache.get_best_node(key, key_hint=key_hint)
+        best_node = await client.get_best_node(cache, key, key_hint=key_hint)
 
         for node in filter(lambda n: n.alive, client._nodes):
             result = await cache_local_peek_async(node, cache.cache_id, key, key_hint=key_hint)
diff --git a/tests/affinity/test_affinity_request_routing.py b/tests/affinity/test_affinity_request_routing.py
index 64197ff..9c94aa4 100644
--- a/tests/affinity/test_affinity_request_routing.py
+++ b/tests/affinity/test_affinity_request_routing.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import asyncio
+import contextlib
 from collections import OrderedDict, deque
 import random
 
@@ -28,6 +29,11 @@ from pyignite.datatypes.cache_config import CacheMode
 from pyignite.datatypes.prop_codes import PROP_NAME, PROP_BACKUPS_NUMBER, PROP_CACHE_KEY_CONFIGURATION, PROP_CACHE_MODE
 from tests.util import wait_for_condition, wait_for_condition_async, start_ignite, kill_process_tree
 
+try:
+    from contextlib import asynccontextmanager
+except ImportError:
+    from async_generator import asynccontextmanager
+
 requests = deque()
 old_send = Connection.send
 old_send_async = AioConnection._send
@@ -208,25 +214,36 @@ client_routed_connection_string = [('127.0.0.1', 10800 + idx) for idx in range(1
 
 
 @pytest.fixture
-def client_routed_cache(request):
+def client_routed():
     client = Client(partition_aware=True)
     try:
         client.connect(client_routed_connection_string)
-        yield client.get_or_create_cache(request.node.name)
+        yield client
     finally:
         client.close()
 
 
 @pytest.fixture
-async def async_client_routed_cache(request):
+def client_routed_cache(client_routed, request):
+    yield client_routed.get_or_create_cache(request.node.name)
+
+
+@pytest.fixture
+async def async_client_routed():
     client = AioClient(partition_aware=True)
     try:
         await client.connect(client_routed_connection_string)
-        yield await client.get_or_create_cache(request.node.name)
+        yield client
     finally:
         await client.close()
 
 
+@pytest.fixture
+async def async_client_routed_cache(async_client_routed, request):
+    cache = await async_client_routed.get_or_create_cache(request.node.name)
+    yield cache
+
+
 def test_cache_operation_routed_to_new_cluster_node(client_routed_cache):
     __perform_cache_operation_routed_to_new_node(client_routed_cache)
 
@@ -345,3 +362,114 @@ def verify_random_node(cache):
         assert idx1 != idx2
 
     return inner_async() if isinstance(cache, AioCache) else inner()
+
+
+@contextlib.contextmanager
+def create_caches(client):
+    caches = []
+    try:
+        caches = [client.create_cache(f'test_cache_{i}') for i in range(0, 10)]
+        yield caches
+    finally:
+        for cache in caches:
+            try:
+                cache.destroy()
+            except:  # noqa: 13
+                cache.destroy()  # Retry if connection failed.
+                pass
+
+
+@asynccontextmanager
+async def create_caches_async(client):
+    caches = []
+    try:
+        caches = await asyncio.gather(*[client.create_cache(f'test_cache_{i}') for i in range(0, 10)])
+        yield caches
+    finally:
+        for cache in caches:
+            try:
+                await cache.destroy()
+            except:  # noqa: 13
+                await cache.destroy()  # Retry if connection failed.
+                pass
+
+
+def test_new_registered_cache_affinity(client):
+    with create_caches(client) as caches:
+        key = 12
+        test_cache = random.choice(caches)
+        test_cache.put(key, key)
+        wait_for_affinity_distribution(test_cache, key, 3)
+
+        caches.append(client.create_cache('new_cache'))
+
+        for cache in caches:
+            cache.get(key)
+            assert requests.pop() == 3
+
+
+@pytest.mark.asyncio
+async def test_new_registered_cache_affinity_async(async_client):
+    async with create_caches_async(async_client) as caches:
+        key = 12
+        test_cache = random.choice(caches)
+        test_cache.put(key, key)
+        await wait_for_affinity_distribution_async(test_cache, key, 3)
+
+        caches.append(await async_client.create_cache('new_cache'))
+
+        for cache in caches:
+            await cache.get(key)
+            assert requests.pop() == 3
+
+
+def test_all_registered_cache_updated_on_new_server(client_routed):
+    with create_caches(client_routed) as caches:
+        key = 12
+        test_cache = random.choice(caches)
+        wait_for_affinity_distribution(test_cache, key, 3)
+        test_cache.put(key, key)
+        assert requests.pop() == 3
+
+        srv = start_ignite(idx=4)
+        try:
+            # Wait for rebalance and partition map exchange
+            wait_for_affinity_distribution(test_cache, key, 4)
+
+            for cache in caches:
+                cache.get(key)
+                assert requests.pop() == 4
+        finally:
+            kill_process_tree(srv.pid)
+
+
+@pytest.mark.asyncio
+async def test_all_registered_cache_updated_on_new_server_async(async_client_routed):
+    async with create_caches_async(async_client_routed) as caches:
+        key = 12
+        test_cache = random.choice(caches)
+        await wait_for_affinity_distribution_async(test_cache, key, 3)
+        await test_cache.put(key, key)
+        assert requests.pop() == 3
+
+        srv = start_ignite(idx=4)
+        try:
+            # Wait for rebalance and partition map exchange
+            await wait_for_affinity_distribution_async(test_cache, key, 4)
+
+            for cache in caches:
+                await cache.get(key)
+                assert requests.pop() == 4
+        finally:
+            kill_process_tree(srv.pid)
+
+
+@pytest.mark.asyncio
+async def test_update_affinity_concurrently(async_client):
+    async with create_caches_async(async_client) as caches:
+        key = 12
+        await asyncio.gather(*[cache.put(key, key) for cache in caches])
+
+        for cache in caches:
+            await cache.get(key)
+            assert requests.pop() == 3
diff --git a/tests/common/test_cache_class.py b/tests/common/test_cache_class.py
index 02dfa82..b035d8f 100644
--- a/tests/common/test_cache_class.py
+++ b/tests/common/test_cache_class.py
@@ -36,7 +36,7 @@ def test_cache_create(client):
 async def test_cache_create_async(async_client):
     cache = await async_client.get_or_create_cache('my_oop_cache')
     try:
-        assert (await cache.name()) == (await cache.settings())[PROP_NAME] == 'my_oop_cache'
+        assert cache.name == (await cache.settings())[PROP_NAME] == 'my_oop_cache'
     finally:
         await cache.destroy()
 
@@ -94,7 +94,7 @@ async def test_cache_config_async(async_client, cache_config):
     await async_client.create_cache(cache_config)
     cache = await async_client.get_or_create_cache('my_oop_cache')
     try:
-        assert await cache.name() == cache_config[PROP_NAME]
+        assert cache.name == cache_config[PROP_NAME]
         assert (await cache.settings())[PROP_CACHE_KEY_CONFIGURATION] == cache_config[PROP_CACHE_KEY_CONFIGURATION]
     finally:
         await cache.destroy()
diff --git a/tests/common/test_cache_size.py b/tests/common/test_cache_size.py
index d134903..f2ec3ed 100644
--- a/tests/common/test_cache_size.py
+++ b/tests/common/test_cache_size.py
@@ -15,8 +15,11 @@
 
 import pytest
 
+from pyignite.datatypes.cache_config import WriteSynchronizationMode
 from pyignite.datatypes.key_value import PeekModes
-from pyignite.datatypes.prop_codes import PROP_NAME, PROP_IS_ONHEAP_CACHE_ENABLED, PROP_BACKUPS_NUMBER
+from pyignite.datatypes.prop_codes import (
+    PROP_NAME, PROP_IS_ONHEAP_CACHE_ENABLED, PROP_BACKUPS_NUMBER, PROP_WRITE_SYNCHRONIZATION_MODE
+)
 from tests.util import get_or_create_cache, get_or_create_cache_async
 
 test_params = [
@@ -24,7 +27,8 @@ test_params = [
         {
             PROP_NAME: 'cache_onheap_backups_2',
             PROP_IS_ONHEAP_CACHE_ENABLED: True,
-            PROP_BACKUPS_NUMBER: 2
+            PROP_BACKUPS_NUMBER: 2,
+            PROP_WRITE_SYNCHRONIZATION_MODE: WriteSynchronizationMode.FULL_SYNC
         },
         [
             [None, 1],
diff --git a/tests/util.py b/tests/util.py
index 064ac7a..5651739 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -26,7 +26,6 @@ import signal
 import subprocess
 import time
 
-from pyignite import Client, AioClient
 
 try:
     from contextlib import asynccontextmanager