You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2021/03/31 12:49:36 UTC
[ignite-python-thin-client] branch master updated: IGNITE-14444
Move affinity mapping storage and best node calculation to clients
This is an automated email from the ASF dual-hosted git repository.
isapego pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-python-thin-client.git
The following commit(s) were added to refs/heads/master by this push:
new 7cbfe32 IGNITE-14444 Move affinity mapping storage and best node calculation to clients
7cbfe32 is described below
commit 7cbfe324eb2fb16335b51191cbed9b0e1dc8c88d
Author: Ivan Dashchinskiy <iv...@gmail.com>
AuthorDate: Wed Mar 31 15:49:07 2021 +0300
IGNITE-14444 Move affinity mapping storage and best node calculation to clients
This closes #26
---
pyignite/aio_cache.py | 183 ++++--------------
pyignite/aio_client.py | 92 ++++++++-
pyignite/cache.py | 240 ++++++------------------
pyignite/client.py | 161 +++++++++++++++-
tests/affinity/test_affinity.py | 4 +-
tests/affinity/test_affinity_request_routing.py | 136 +++++++++++++-
tests/common/test_cache_class.py | 4 +-
tests/common/test_cache_size.py | 8 +-
tests/util.py | 1 -
9 files changed, 476 insertions(+), 353 deletions(-)
diff --git a/pyignite/aio_cache.py b/pyignite/aio_cache.py
index a2af0a7..24d4bce 100644
--- a/pyignite/aio_cache.py
+++ b/pyignite/aio_cache.py
@@ -13,15 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
-from typing import Any, Dict, Iterable, Optional, Union
+from typing import Any, Iterable, Optional, Union
-from .constants import AFFINITY_RETRIES, AFFINITY_DELAY
-from .connection import AioConnection
-from .datatypes import prop_codes
-from .datatypes.base import IgniteDataType
from .datatypes.internal import AnyDataObject
-from .exceptions import CacheCreationError, CacheError, ParameterError, connection_errors
-from .utils import cache_id, status_to_exception
+from .exceptions import CacheCreationError, CacheError, ParameterError
+from .utils import status_to_exception
from .api.cache_config import (
cache_create_async, cache_get_or_create_async, cache_destroy_async, cache_get_configuration_async,
cache_create_with_config_async, cache_get_or_create_with_config_async
@@ -34,8 +30,7 @@ from .api.key_value import (
cache_remove_if_equals_async, cache_replace_if_equals_async, cache_get_size_async,
)
from .cursors import AioScanCursor
-from .api.affinity import cache_get_node_partitions_async
-from .cache import __parse_settings, BaseCacheMixin
+from .cache import __parse_settings, BaseCache
async def get_cache(client: 'AioClient', settings: Union[str, dict]) -> 'AioCache':
@@ -76,13 +71,13 @@ async def get_or_create_cache(client: 'AioClient', settings: Union[str, dict]) -
return AioCache(client, name)
-class AioCache(BaseCacheMixin):
+class AioCache(BaseCache):
"""
Ignite cache abstraction. Users should never use this class directly,
but construct its instances with
- :py:meth:`~pyignite.client.Client.create_cache`,
- :py:meth:`~pyignite.client.Client.get_or_create_cache` or
- :py:meth:`~pyignite.client.Client.get_cache` methods instead. See
+ :py:meth:`~pyignite.aio_client.AioClient.create_cache`,
+ :py:meth:`~pyignite.aio_client.AioClient.get_or_create_cache` or
+ :py:meth:`~pyignite.aio_client.AioClient.get_cache` methods instead. See
:ref:`this example <create_cache>` on how to do it.
"""
def __init__(self, client: 'AioClient', name: str):
@@ -92,12 +87,10 @@ class AioCache(BaseCacheMixin):
:param client: Async Ignite client,
:param name: Cache name.
"""
- self._client = client
- self._name = name
- self._cache_id = cache_id(self._name)
- self._settings = None
- self._affinity_query_mux = asyncio.Lock()
- self.affinity = {'version': (0, 0)}
+ super().__init__(client, name)
+
+ async def _get_best_node(self, key=None, key_hint=None):
+ return await self.client.get_best_node(self._cache_id, key, key_hint)
async def settings(self) -> Optional[dict]:
"""
@@ -109,7 +102,7 @@ class AioCache(BaseCacheMixin):
:return: dict of cache properties and their values.
"""
if self._settings is None:
- conn = await self.get_best_node()
+ conn = await self._get_best_node()
config_result = await cache_get_configuration_async(conn, self._cache_id)
if config_result.status == 0:
@@ -119,121 +112,15 @@ class AioCache(BaseCacheMixin):
return self._settings
- async def name(self) -> str:
- """
- Lazy cache name.
-
- :return: cache name string.
- """
- if self._name is None:
- settings = await self.settings()
- self._name = settings[prop_codes.PROP_NAME]
-
- return self._name
-
- @property
- def client(self) -> 'AioClient':
- """
- Ignite :class:`~pyignite.aio_client.AioClient` object.
-
- :return: Async client object, through which the cache is accessed.
- """
- return self._client
-
- @property
- def cache_id(self) -> int:
- """
- Cache ID.
-
- :return: integer value of the cache ID.
- """
- return self._cache_id
-
@status_to_exception(CacheError)
async def destroy(self):
"""
Destroys cache with a given name.
"""
- conn = await self.get_best_node()
+ conn = await self._get_best_node()
return await cache_destroy_async(conn, self._cache_id)
@status_to_exception(CacheError)
- async def _get_affinity(self, conn: 'AioConnection') -> Dict:
- """
- Queries server for affinity mappings. Retries in case
- of an intermittent error (most probably “Getting affinity for topology
- version earlier than affinity is calculated”).
-
- :param conn: connection to Igneite server,
- :return: OP_CACHE_PARTITIONS operation result value.
- """
- for _ in range(AFFINITY_RETRIES or 1):
- result = await cache_get_node_partitions_async(conn, self._cache_id)
- if result.status == 0 and result.value['partition_mapping']:
- break
- await asyncio.sleep(AFFINITY_DELAY)
-
- return result
-
- async def get_best_node(self, key: Any = None, key_hint: 'IgniteDataType' = None) -> 'AioConnection':
- """
- Returns the node from the list of the nodes, opened by client, that
- most probably contains the needed key-value pair. See IEP-23.
-
- This method is not a part of the public API. Unless you wish to
- extend the `pyignite` capabilities (with additional testing, logging,
- examining connections, et c.) you probably should not use it.
-
- :param key: (optional) pythonic key,
- :param key_hint: (optional) Ignite data type, for which the given key
- should be converted,
- :return: Ignite connection object.
- """
- conn = await self._client.random_node()
-
- if self.client.partition_aware and key is not None:
- if self.__should_update_mapping():
- async with self._affinity_query_mux:
- while self.__should_update_mapping():
- try:
- full_affinity = await self._get_affinity(conn)
- self._update_affinity(full_affinity)
-
- asyncio.ensure_future(
- asyncio.gather(
- *[conn.reconnect() for conn in self.client._nodes if not conn.alive],
- return_exceptions=True
- )
- )
-
- break
- except connection_errors:
- # retry if connection failed
- conn = await self._client.random_node()
- pass
- except CacheError:
- # server did not create mapping in time
- return conn
-
- parts = self.affinity.get('number_of_partitions')
-
- if not parts:
- return conn
-
- key, key_hint = self._get_affinity_key(key, key_hint)
-
- hashcode = await key_hint.hashcode_async(key, self._client)
-
- best_node = self._get_node_by_hashcode(hashcode, parts)
- if best_node:
- return best_node
-
- return conn
-
- def __should_update_mapping(self):
- return self.affinity['version'] < self._client.affinity_version
-
- @status_to_exception(CacheError)
async def get(self, key, key_hint: object = None) -> Any:
"""
Retrieves a value from cache by key.
@@ -246,7 +133,7 @@ class AioCache(BaseCacheMixin):
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
- conn = await self.get_best_node(key, key_hint)
+ conn = await self._get_best_node(key, key_hint)
result = await cache_get_async(conn, self._cache_id, key, key_hint=key_hint)
result.value = await self.client.unwrap_binary(result.value)
return result
@@ -267,7 +154,7 @@ class AioCache(BaseCacheMixin):
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
- conn = await self.get_best_node(key, key_hint)
+ conn = await self._get_best_node(key, key_hint)
return await cache_put_async(conn, self._cache_id, key, value, key_hint=key_hint, value_hint=value_hint)
@status_to_exception(CacheError)
@@ -278,7 +165,7 @@ class AioCache(BaseCacheMixin):
:param keys: list of keys or tuples of (key, key_hint),
:return: a dict of key-value pairs.
"""
- conn = await self.get_best_node()
+ conn = await self._get_best_node()
result = await cache_get_all_async(conn, self._cache_id, keys)
if result.value:
keys = list(result.value.keys())
@@ -298,7 +185,7 @@ class AioCache(BaseCacheMixin):
to save. Each key or value can be an item of representable
Python type or a tuple of (item, hint),
"""
- conn = await self.get_best_node()
+ conn = await self._get_best_node()
return await cache_put_all_async(conn, self._cache_id, pairs)
@status_to_exception(CacheError)
@@ -316,7 +203,7 @@ class AioCache(BaseCacheMixin):
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
- conn = await self.get_best_node(key, key_hint)
+ conn = await self._get_best_node(key, key_hint)
result = await cache_replace_async(conn, self._cache_id, key, value, key_hint=key_hint, value_hint=value_hint)
result.value = await self.client.unwrap_binary(result.value)
return result
@@ -329,7 +216,7 @@ class AioCache(BaseCacheMixin):
:param keys: (optional) list of cache keys or (key, key type
hint) tuples to clear (default: clear all).
"""
- conn = await self.get_best_node()
+ conn = await self._get_best_node()
if keys:
return await cache_clear_keys_async(conn, self._cache_id, keys)
else:
@@ -347,7 +234,7 @@ class AioCache(BaseCacheMixin):
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
- conn = await self.get_best_node(key, key_hint)
+ conn = await self._get_best_node(key, key_hint)
return await cache_clear_key_async(conn, self._cache_id, key, key_hint=key_hint)
@status_to_exception(CacheError)
@@ -357,7 +244,7 @@ class AioCache(BaseCacheMixin):
:param keys: a list of keys or (key, type hint) tuples
"""
- conn = await self.get_best_node()
+ conn = await self._get_best_node()
return await cache_clear_keys_async(conn, self._cache_id, keys)
@status_to_exception(CacheError)
@@ -373,7 +260,7 @@ class AioCache(BaseCacheMixin):
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
- conn = await self.get_best_node(key, key_hint)
+ conn = await self._get_best_node(key, key_hint)
return await cache_contains_key_async(conn, self._cache_id, key, key_hint=key_hint)
@status_to_exception(CacheError)
@@ -384,7 +271,7 @@ class AioCache(BaseCacheMixin):
:param keys: a list of keys or (key, type hint) tuples,
:return: boolean `True` when all keys are present, `False` otherwise.
"""
- conn = await self.get_best_node()
+ conn = await self._get_best_node()
return await cache_contains_keys_async(conn, self._cache_id, keys)
@status_to_exception(CacheError)
@@ -404,7 +291,7 @@ class AioCache(BaseCacheMixin):
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
- conn = await self.get_best_node(key, key_hint)
+ conn = await self._get_best_node(key, key_hint)
result = await cache_get_and_put_async(conn, self._cache_id, key, value, key_hint, value_hint)
result.value = await self.client.unwrap_binary(result.value)
@@ -427,7 +314,7 @@ class AioCache(BaseCacheMixin):
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
- conn = await self.get_best_node(key, key_hint)
+ conn = await self._get_best_node(key, key_hint)
result = await cache_get_and_put_if_absent_async(conn, self._cache_id, key, value, key_hint, value_hint)
result.value = await self.client.unwrap_binary(result.value)
return result
@@ -448,7 +335,7 @@ class AioCache(BaseCacheMixin):
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
- conn = await self.get_best_node(key, key_hint)
+ conn = await self._get_best_node(key, key_hint)
return await cache_put_if_absent_async(conn, self._cache_id, key, value, key_hint, value_hint)
@status_to_exception(CacheError)
@@ -464,7 +351,7 @@ class AioCache(BaseCacheMixin):
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
- conn = await self.get_best_node(key, key_hint)
+ conn = await self._get_best_node(key, key_hint)
result = await cache_get_and_remove_async(conn, self._cache_id, key, key_hint)
result.value = await self.client.unwrap_binary(result.value)
return result
@@ -487,7 +374,7 @@ class AioCache(BaseCacheMixin):
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
- conn = await self.get_best_node(key, key_hint)
+ conn = await self._get_best_node(key, key_hint)
result = await cache_get_and_replace_async(conn, self._cache_id, key, value, key_hint, value_hint)
result.value = await self.client.unwrap_binary(result.value)
return result
@@ -504,7 +391,7 @@ class AioCache(BaseCacheMixin):
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
- conn = await self.get_best_node(key, key_hint)
+ conn = await self._get_best_node(key, key_hint)
return await cache_remove_key_async(conn, self._cache_id, key, key_hint)
@status_to_exception(CacheError)
@@ -515,7 +402,7 @@ class AioCache(BaseCacheMixin):
:param keys: list of keys or tuples of (key, key_hint) to remove.
"""
- conn = await self.get_best_node()
+ conn = await self._get_best_node()
return await cache_remove_keys_async(conn, self._cache_id, keys)
@status_to_exception(CacheError)
@@ -523,7 +410,7 @@ class AioCache(BaseCacheMixin):
"""
Removes all cache entries, notifying listeners and cache writers.
"""
- conn = await self.get_best_node()
+ conn = await self._get_best_node()
return await cache_remove_all_async(conn, self._cache_id)
@status_to_exception(CacheError)
@@ -542,7 +429,7 @@ class AioCache(BaseCacheMixin):
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
- conn = await self.get_best_node(key, key_hint)
+ conn = await self._get_best_node(key, key_hint)
return await cache_remove_if_equals_async(conn, self._cache_id, key, sample, key_hint, sample_hint)
@status_to_exception(CacheError)
@@ -565,7 +452,7 @@ class AioCache(BaseCacheMixin):
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
- conn = await self.get_best_node(key, key_hint)
+ conn = await self._get_best_node(key, key_hint)
result = await cache_replace_if_equals_async(conn, self._cache_id, key, sample, value, key_hint, sample_hint,
value_hint)
result.value = await self.client.unwrap_binary(result.value)
@@ -581,7 +468,7 @@ class AioCache(BaseCacheMixin):
(PeekModes.BACKUP). Defaults to primary cache partitions (PeekModes.PRIMARY),
:return: integer number of cache entries.
"""
- conn = await self.get_best_node()
+ conn = await self._get_best_node()
return await cache_get_size_async(conn, self._cache_id, peek_modes)
def scan(self, page_size: int = 1, partitions: int = -1, local: bool = False):
diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py
index d2cc3ff..5e64450 100644
--- a/pyignite/aio_client.py
+++ b/pyignite/aio_client.py
@@ -15,19 +15,21 @@
import asyncio
import random
from itertools import chain
-from typing import Iterable, Type, Union, Any
+from typing import Iterable, Type, Union, Any, Dict
+from .api import cache_get_node_partitions_async
from .api.binary import get_binary_type_async, put_binary_type_async
from .api.cache_config import cache_get_names_async
+from .cache import BaseCache
from .client import BaseClient
from .cursors import AioSqlFieldsCursor
from .aio_cache import AioCache, get_cache, create_cache, get_or_create_cache
from .connection import AioConnection
-from .constants import IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT
+from .constants import AFFINITY_RETRIES, AFFINITY_DELAY
from .datatypes import BinaryObject
from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors
from .stream import AioBinaryStream, READ_BACKWARD
-from .utils import cache_id, entity_id, status_to_exception, is_iterable, is_wrapped
+from .utils import cache_id, entity_id, status_to_exception, is_wrapped
__all__ = ['AioClient']
@@ -72,6 +74,7 @@ class AioClient(BaseClient):
"""
super().__init__(compact_footer, partition_aware, **kwargs)
self._registry_mux = asyncio.Lock()
+ self._affinity_query_mux = asyncio.Lock()
def connect(self, *args):
"""
@@ -271,6 +274,89 @@ class AioClient(BaseClient):
return await BinaryObject.to_python_async(stream.read_ctype(data_class, direction=READ_BACKWARD), self)
return value
+ @status_to_exception(CacheError)
+ async def _get_affinity(self, conn: 'AioConnection', caches: Iterable[int]) -> Dict:
+ """
+ Queries server for affinity mappings. Retries in case
+ of an intermittent error (most probably “Getting affinity for topology
+ version earlier than affinity is calculated”).
+
+ :param conn: connection to Igneite server,
+ :param caches: Ids of caches,
+ :return: OP_CACHE_PARTITIONS operation result value.
+ """
+ for _ in range(AFFINITY_RETRIES or 1):
+ result = await cache_get_node_partitions_async(conn, caches)
+ if result.status == 0 and result.value['partition_mapping']:
+ break
+ await asyncio.sleep(AFFINITY_DELAY)
+
+ return result
+
+ async def get_best_node(
+ self, cache: Union[int, str, 'BaseCache'], key: Any = None, key_hint: 'IgniteDataType' = None
+ ) -> 'AioConnection':
+ """
+ Returns the node from the list of the nodes, opened by client, that
+ most probably contains the needed key-value pair. See IEP-23.
+
+ This method is not a part of the public API. Unless you wish to
+ extend the `pyignite` capabilities (with additional testing, logging,
+ examining connections, et c.) you probably should not use it.
+
+ :param cache: Ignite cache, cache name or cache id,
+ :param key: (optional) pythonic key,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ :return: Ignite connection object.
+ """
+ conn = await self.random_node()
+
+ if self.partition_aware and key is not None:
+ caches = self._caches_to_update_affinity()
+ if caches:
+ async with self._affinity_query_mux:
+ while True:
+ caches = self._caches_to_update_affinity()
+ if not caches:
+ break
+
+ try:
+ full_affinity = await self._get_affinity(conn, caches)
+ self._update_affinity(full_affinity)
+
+ asyncio.ensure_future(
+ asyncio.gather(
+ *[conn.reconnect() for conn in self._nodes if not conn.alive],
+ return_exceptions=True
+ )
+ )
+
+ break
+ except connection_errors:
+ # retry if connection failed
+ conn = await self.random_node()
+ pass
+ except CacheError:
+ # server did not create mapping in time
+ return conn
+
+ c_id = cache.cache_id if isinstance(cache, BaseCache) else cache_id(cache)
+ parts = self._cache_partition_mapping(c_id).get('number_of_partitions')
+
+ if not parts:
+ return conn
+
+ key, key_hint = self._get_affinity_key(c_id, key, key_hint)
+
+ hashcode = await key_hint.hashcode_async(key, self)
+
+ best_node = self._get_node_by_hashcode(c_id, hashcode, parts)
+ if best_node:
+ return best_node
+
+ return conn
+
async def create_cache(self, settings: Union[str, dict]) -> 'AioCache':
"""
Creates Ignite cache by name. Raises `CacheError` if such a cache is
diff --git a/pyignite/cache.py b/pyignite/cache.py
index 2602d1c..f00f000 100644
--- a/pyignite/cache.py
+++ b/pyignite/cache.py
@@ -13,15 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import time
-from typing import Any, Dict, Iterable, Optional, Tuple, Union
+from typing import Any, Iterable, Optional, Tuple, Union
-from .constants import AFFINITY_RETRIES, AFFINITY_DELAY
-from .binary import GenericObjectMeta
from .datatypes import prop_codes
from .datatypes.internal import AnyDataObject
-from .exceptions import CacheCreationError, CacheError, ParameterError, SQLError, connection_errors
-from .utils import cache_id, get_field_by_id, status_to_exception, unsigned
+from .exceptions import CacheCreationError, CacheError, ParameterError, SQLError
+from .utils import cache_id, status_to_exception
from .api.cache_config import (
cache_create, cache_create_with_config, cache_get_or_create, cache_get_or_create_with_config, cache_destroy,
cache_get_configuration
@@ -33,7 +30,6 @@ from .api.key_value import (
cache_remove_if_equals, cache_replace_if_equals, cache_get_size
)
from .cursors import ScanCursor, SqlCursor
-from .api.affinity import cache_get_node_partitions
PROP_CODES = set([
getattr(prop_codes, x)
@@ -96,65 +92,39 @@ def __parse_settings(settings: Union[str, dict]) -> Tuple[Optional[str], Optiona
raise ParameterError('You should supply at least cache name')
-class BaseCacheMixin:
- def _get_affinity_key(self, key, key_hint=None):
- if key_hint is None:
- key_hint = AnyDataObject.map_python_type(key)
-
- if self.affinity.get('is_applicable'):
- config = self.affinity.get('cache_config')
- if config:
- affinity_key_id = config.get(key_hint.type_id)
-
- if affinity_key_id and isinstance(key, GenericObjectMeta):
- return get_field_by_id(key, affinity_key_id)
-
- return key, key_hint
-
- def _update_affinity(self, full_affinity):
- self.affinity['version'] = full_affinity['version']
-
- full_mapping = full_affinity.get('partition_mapping')
- if full_mapping and self.cache_id in full_mapping:
- self.affinity.update(full_mapping[self.cache_id])
+class BaseCache:
+ def __init__(self, client: 'BaseClient', name: str):
+ self._client = client
+ self._name = name
+ self._settings = None
+ self._cache_id = cache_id(self._name)
+ self._client.register_cache(self._cache_id)
- def _get_node_by_hashcode(self, hashcode, parts):
+ @property
+ def name(self) -> str:
"""
- Get node by key hashcode. Calculate partition and return node on that it is primary.
- (algorithm is taken from `RendezvousAffinityFunction.java`)
+ :return: cache name string.
"""
+ return self._name
- # calculate partition for key or affinity key
- # (algorithm is taken from `RendezvousAffinityFunction.java`)
- mask = parts - 1
-
- if parts & mask == 0:
- part = (hashcode ^ (unsigned(hashcode) >> 16)) & mask
- else:
- part = abs(hashcode // parts)
-
- assert 0 <= part < parts, 'Partition calculation has failed'
-
- node_mapping = self.affinity.get('node_mapping')
- if not node_mapping:
- return None
+ @property
+ def client(self) -> 'BaseClient':
+ """
+ :return: Client object, through which the cache is accessed.
+ """
+ return self._client
- node_uuid, best_conn = None, None
- for u, p in node_mapping.items():
- if part in p:
- node_uuid = u
- break
+ @property
+ def cache_id(self) -> int:
+ """
+ Cache ID.
- if node_uuid:
- for n in self.client._nodes:
- if n.uuid == node_uuid:
- best_conn = n
- break
- if best_conn and best_conn.alive:
- return best_conn
+ :return: integer value of the cache ID.
+ """
+ return self._cache_id
-class Cache(BaseCacheMixin):
+class Cache(BaseCache):
"""
Ignite cache abstraction. Users should never use this class directly,
but construct its instances with
@@ -171,11 +141,10 @@ class Cache(BaseCacheMixin):
:param client: Ignite client,
:param name: Cache name.
"""
- self._client = client
- self._name = name
- self._settings = None
- self._cache_id = cache_id(self._name)
- self.affinity = {'version': (0, 0)}
+ super().__init__(client, name)
+
+ def _get_best_node(self, key=None, key_hint=None):
+ return self.client.get_best_node(self._cache_id, key, key_hint)
@property
def settings(self) -> Optional[dict]:
@@ -189,7 +158,7 @@ class Cache(BaseCacheMixin):
"""
if self._settings is None:
config_result = cache_get_configuration(
- self.get_best_node(),
+ self._get_best_node(),
self._cache_id
)
if config_result.status == 0:
@@ -199,111 +168,12 @@ class Cache(BaseCacheMixin):
return self._settings
- @property
- def name(self) -> str:
- """
- Lazy cache name.
-
- :return: cache name string.
- """
- if self._name is None:
- self._name = self.settings[prop_codes.PROP_NAME]
-
- return self._name
-
- @property
- def client(self) -> 'Client':
- """
- Ignite :class:`~pyignite.client.Client` object.
-
- :return: Client object, through which the cache is accessed.
- """
- return self._client
-
- @property
- def cache_id(self) -> int:
- """
- Cache ID.
-
- :return: integer value of the cache ID.
- """
- return self._cache_id
-
@status_to_exception(CacheError)
def destroy(self):
"""
Destroys cache with a given name.
"""
- return cache_destroy(self.get_best_node(), self._cache_id)
-
- @status_to_exception(CacheError)
- def _get_affinity(self, conn: 'Connection') -> Dict:
- """
- Queries server for affinity mappings. Retries in case
- of an intermittent error (most probably “Getting affinity for topology
- version earlier than affinity is calculated”).
-
- :param conn: connection to Igneite server,
- :return: OP_CACHE_PARTITIONS operation result value.
- """
- for _ in range(AFFINITY_RETRIES or 1):
- result = cache_get_node_partitions(conn, self._cache_id)
- if result.status == 0 and result.value['partition_mapping']:
- break
- time.sleep(AFFINITY_DELAY)
-
- return result
-
- def get_best_node(self, key: Any = None, key_hint: 'IgniteDataType' = None) -> 'Connection':
- """
- Returns the node from the list of the nodes, opened by client, that
- most probably contains the needed key-value pair. See IEP-23.
-
- This method is not a part of the public API. Unless you wish to
- extend the `pyignite` capabilities (with additional testing, logging,
- examining connections, et c.) you probably should not use it.
-
- :param key: (optional) pythonic key,
- :param key_hint: (optional) Ignite data type, for which the given key
- should be converted,
- :return: Ignite connection object.
- """
- conn = self._client.random_node
-
- if self.client.partition_aware and key is not None:
- if self.affinity['version'] < self._client.affinity_version:
- # update partition mapping
- while True:
- try:
- full_affinity = self._get_affinity(conn)
- break
- except connection_errors:
- # retry if connection failed
- conn = self._client.random_node
- pass
- except CacheError:
- # server did not create mapping in time
- return conn
-
- self._update_affinity(full_affinity)
-
- for conn in self.client._nodes:
- if not conn.alive:
- conn.reconnect()
-
- parts = self.affinity.get('number_of_partitions')
-
- if not parts:
- return conn
-
- key, key_hint = self._get_affinity_key(key, key_hint)
- hashcode = key_hint.hashcode(key, self._client)
-
- best_node = self._get_node_by_hashcode(hashcode, parts)
- if best_node:
- return best_node
-
- return conn
+ return cache_destroy(self._get_best_node(), self._cache_id)
@status_to_exception(CacheError)
def get(self, key, key_hint: object = None) -> Any:
@@ -319,7 +189,7 @@ class Cache(BaseCacheMixin):
key_hint = AnyDataObject.map_python_type(key)
result = cache_get(
- self.get_best_node(key, key_hint),
+ self._get_best_node(key, key_hint),
self._cache_id,
key,
key_hint=key_hint
@@ -346,7 +216,7 @@ class Cache(BaseCacheMixin):
key_hint = AnyDataObject.map_python_type(key)
return cache_put(
- self.get_best_node(key, key_hint),
+ self._get_best_node(key, key_hint),
self._cache_id, key, value,
key_hint=key_hint, value_hint=value_hint
)
@@ -359,7 +229,7 @@ class Cache(BaseCacheMixin):
:param keys: list of keys or tuples of (key, key_hint),
:return: a dict of key-value pairs.
"""
- result = cache_get_all(self.get_best_node(), self._cache_id, keys)
+ result = cache_get_all(self._get_best_node(), self._cache_id, keys)
if result.value:
for key, value in result.value.items():
result.value[key] = self.client.unwrap_binary(value)
@@ -375,7 +245,7 @@ class Cache(BaseCacheMixin):
to save. Each key or value can be an item of representable
Python type or a tuple of (item, hint),
"""
- return cache_put_all(self.get_best_node(), self._cache_id, pairs)
+ return cache_put_all(self._get_best_node(), self._cache_id, pairs)
@status_to_exception(CacheError)
def replace(
@@ -395,7 +265,7 @@ class Cache(BaseCacheMixin):
key_hint = AnyDataObject.map_python_type(key)
result = cache_replace(
- self.get_best_node(key, key_hint),
+ self._get_best_node(key, key_hint),
self._cache_id, key, value,
key_hint=key_hint, value_hint=value_hint
)
@@ -410,7 +280,7 @@ class Cache(BaseCacheMixin):
:param keys: (optional) list of cache keys or (key, key type
hint) tuples to clear (default: clear all).
"""
- conn = self.get_best_node()
+ conn = self._get_best_node()
if keys:
return cache_clear_keys(conn, self._cache_id, keys)
else:
@@ -429,7 +299,7 @@ class Cache(BaseCacheMixin):
key_hint = AnyDataObject.map_python_type(key)
return cache_clear_key(
- self.get_best_node(key, key_hint),
+ self._get_best_node(key, key_hint),
self._cache_id,
key,
key_hint=key_hint
@@ -443,7 +313,7 @@ class Cache(BaseCacheMixin):
:param keys: a list of keys or (key, type hint) tuples
"""
- return cache_clear_keys(self.get_best_node(), self._cache_id, keys)
+ return cache_clear_keys(self._get_best_node(), self._cache_id, keys)
@status_to_exception(CacheError)
def contains_key(self, key, key_hint=None) -> bool:
@@ -459,7 +329,7 @@ class Cache(BaseCacheMixin):
key_hint = AnyDataObject.map_python_type(key)
return cache_contains_key(
- self.get_best_node(key, key_hint),
+ self._get_best_node(key, key_hint),
self._cache_id,
key,
key_hint=key_hint
@@ -473,7 +343,7 @@ class Cache(BaseCacheMixin):
:param keys: a list of keys or (key, type hint) tuples,
:return: boolean `True` when all keys are present, `False` otherwise.
"""
- return cache_contains_keys(self.get_best_node(), self._cache_id, keys)
+ return cache_contains_keys(self._get_best_node(), self._cache_id, keys)
@status_to_exception(CacheError)
def get_and_put(self, key, value, key_hint=None, value_hint=None) -> Any:
@@ -493,7 +363,7 @@ class Cache(BaseCacheMixin):
key_hint = AnyDataObject.map_python_type(key)
result = cache_get_and_put(
- self.get_best_node(key, key_hint),
+ self._get_best_node(key, key_hint),
self._cache_id,
key, value,
key_hint, value_hint
@@ -521,7 +391,7 @@ class Cache(BaseCacheMixin):
key_hint = AnyDataObject.map_python_type(key)
result = cache_get_and_put_if_absent(
- self.get_best_node(key, key_hint),
+ self._get_best_node(key, key_hint),
self._cache_id,
key, value,
key_hint, value_hint
@@ -546,7 +416,7 @@ class Cache(BaseCacheMixin):
key_hint = AnyDataObject.map_python_type(key)
return cache_put_if_absent(
- self.get_best_node(key, key_hint),
+ self._get_best_node(key, key_hint),
self._cache_id,
key, value,
key_hint, value_hint
@@ -566,7 +436,7 @@ class Cache(BaseCacheMixin):
key_hint = AnyDataObject.map_python_type(key)
result = cache_get_and_remove(
- self.get_best_node(key, key_hint),
+ self._get_best_node(key, key_hint),
self._cache_id,
key,
key_hint
@@ -595,7 +465,7 @@ class Cache(BaseCacheMixin):
key_hint = AnyDataObject.map_python_type(key)
result = cache_get_and_replace(
- self.get_best_node(key, key_hint),
+ self._get_best_node(key, key_hint),
self._cache_id,
key, value,
key_hint, value_hint
@@ -616,7 +486,7 @@ class Cache(BaseCacheMixin):
key_hint = AnyDataObject.map_python_type(key)
return cache_remove_key(
- self.get_best_node(key, key_hint), self._cache_id, key, key_hint
+ self._get_best_node(key, key_hint), self._cache_id, key, key_hint
)
@status_to_exception(CacheError)
@@ -628,7 +498,7 @@ class Cache(BaseCacheMixin):
:param keys: list of keys or tuples of (key, key_hint) to remove.
"""
return cache_remove_keys(
- self.get_best_node(), self._cache_id, keys
+ self._get_best_node(), self._cache_id, keys
)
@status_to_exception(CacheError)
@@ -636,7 +506,7 @@ class Cache(BaseCacheMixin):
"""
Removes all cache entries, notifying listeners and cache writers.
"""
- return cache_remove_all(self.get_best_node(), self._cache_id)
+ return cache_remove_all(self._get_best_node(), self._cache_id)
@status_to_exception(CacheError)
def remove_if_equals(self, key, sample, key_hint=None, sample_hint=None):
@@ -655,7 +525,7 @@ class Cache(BaseCacheMixin):
key_hint = AnyDataObject.map_python_type(key)
return cache_remove_if_equals(
- self.get_best_node(key, key_hint),
+ self._get_best_node(key, key_hint),
self._cache_id,
key, sample,
key_hint, sample_hint
@@ -685,7 +555,7 @@ class Cache(BaseCacheMixin):
key_hint = AnyDataObject.map_python_type(key)
result = cache_replace_if_equals(
- self.get_best_node(key, key_hint),
+ self._get_best_node(key, key_hint),
self._cache_id,
key, sample, value,
key_hint, sample_hint, value_hint
@@ -704,7 +574,7 @@ class Cache(BaseCacheMixin):
:return: integer number of cache entries.
"""
return cache_get_size(
- self.get_best_node(), self._cache_id, peek_modes
+ self._get_best_node(), self._cache_id, peek_modes
)
def scan(self, page_size: int = 1, partitions: int = -1, local: bool = False):
diff --git a/pyignite/client.py b/pyignite/client.py
index 05df617..2f24c43 100644
--- a/pyignite/client.py
+++ b/pyignite/client.py
@@ -39,25 +39,28 @@ It returns a generator with result rows.
:py:meth:`~pyignite.client.Client.query_binary_type` methods operates
the local (class-wise) registry for Ignite Complex objects.
"""
-
+import time
from collections import defaultdict, OrderedDict
import random
import re
from itertools import chain
-from typing import Iterable, Type, Union, Any
+from typing import Iterable, Type, Union, Any, Dict
+from .api import cache_get_node_partitions
from .api.binary import get_binary_type, put_binary_type
from .api.cache_config import cache_get_names
from .cursors import SqlFieldsCursor
-from .cache import Cache, create_cache, get_cache, get_or_create_cache
+from .cache import Cache, create_cache, get_cache, get_or_create_cache, BaseCache
from .connection import Connection
-from .constants import IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT, PROTOCOL_BYTE_ORDER
-from .datatypes import BinaryObject
+from .constants import IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT, PROTOCOL_BYTE_ORDER, AFFINITY_RETRIES, AFFINITY_DELAY
+from .datatypes import BinaryObject, AnyDataObject
+from .datatypes.base import IgniteDataType
from .datatypes.internal import tc_map
from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors
from .stream import BinaryStream, READ_BACKWARD
from .utils import (
- cache_id, capitalize, entity_id, schema_id, process_delimiter, status_to_exception, is_iterable, is_wrapped
+ cache_id, capitalize, entity_id, schema_id, process_delimiter, status_to_exception, is_iterable, is_wrapped,
+ get_field_by_id, unsigned
)
from .binary import GenericObjectMeta
@@ -79,6 +82,7 @@ class BaseClient:
self._current_node = 0
self._partition_aware = partition_aware
self.affinity_version = (0, 0)
+ self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)}
self._protocol_version = None
@property
@@ -242,6 +246,76 @@ class BaseClient:
return None
return self._registry[type_id]
+ def register_cache(self, cache_id: int):
+ if self.partition_aware and cache_id not in self._affinity:
+ self._affinity['partition_mapping'][cache_id] = {}
+
+ def _get_affinity_key(self, cache_id, key, key_hint=None):
+ if key_hint is None:
+ key_hint = AnyDataObject.map_python_type(key)
+
+ cache_partition_mapping = self._cache_partition_mapping(cache_id)
+ if cache_partition_mapping and cache_partition_mapping.get('is_applicable'):
+ config = cache_partition_mapping.get('cache_config')
+ if config:
+ affinity_key_id = config.get(key_hint.type_id)
+
+ if affinity_key_id and isinstance(key, GenericObjectMeta):
+ return get_field_by_id(key, affinity_key_id)
+
+ return key, key_hint
+
+ def _update_affinity(self, full_affinity):
+ self._affinity['version'] = full_affinity['version']
+
+ full_mapping = full_affinity.get('partition_mapping')
+ if full_mapping:
+ self._affinity['partition_mapping'].update(full_mapping)
+
+ def _caches_to_update_affinity(self):
+ if self._affinity['version'] < self.affinity_version:
+ return list(self._affinity['partition_mapping'].keys())
+ else:
+ return list(c_id for c_id, c_mapping in self._affinity['partition_mapping'].items() if not c_mapping)
+
+ def _cache_partition_mapping(self, cache_id):
+ return self._affinity['partition_mapping'][cache_id]
+
+ def _get_node_by_hashcode(self, cache_id, hashcode, parts):
+ """
+ Get node by key hashcode. Calculate partition and return node on that it is primary.
+ (algorithm is taken from `RendezvousAffinityFunction.java`)
+ """
+
+ # calculate partition for key or affinity key
+ # (algorithm is taken from `RendezvousAffinityFunction.java`)
+ mask = parts - 1
+
+ if parts & mask == 0:
+ part = (hashcode ^ (unsigned(hashcode) >> 16)) & mask
+ else:
+ part = abs(hashcode // parts)
+
+ assert 0 <= part < parts, 'Partition calculation has failed'
+
+ node_mapping = self._cache_partition_mapping(cache_id).get('node_mapping')
+ if not node_mapping:
+ return None
+
+ node_uuid, best_conn = None, None
+ for u, p in node_mapping.items():
+ if part in p:
+ node_uuid = u
+ break
+
+ if node_uuid:
+ for n in self._nodes:
+ if n.uuid == node_uuid:
+ best_conn = n
+ break
+ if best_conn and best_conn.alive:
+ return best_conn
+
class _ConnectionContextManager:
def __init__(self, client, nodes):
@@ -476,6 +550,81 @@ class Client(BaseClient):
return BinaryObject.to_python(stream.read_ctype(data_class, direction=READ_BACKWARD), self)
return value
+ @status_to_exception(CacheError)
+ def _get_affinity(self, conn: 'Connection', caches: Iterable[int]) -> Dict:
+ """
+ Queries server for affinity mappings. Retries in case
+ of an intermittent error (most probably “Getting affinity for topology
+ version earlier than affinity is calculated”).
+
+ :param conn: connection to Ignite server,
+ :param caches: Ids of caches,
+ :return: OP_CACHE_PARTITIONS operation result value.
+ """
+ for _ in range(AFFINITY_RETRIES or 1):
+ result = cache_get_node_partitions(conn, caches)
+ if result.status == 0 and result.value['partition_mapping']:
+ break
+ time.sleep(AFFINITY_DELAY)
+
+ return result
+
+ def get_best_node(
+ self, cache: Union[int, str, 'BaseCache'], key: Any = None, key_hint: 'IgniteDataType' = None
+ ) -> 'Connection':
+ """
+ Returns the node from the list of the nodes, opened by client, that
+ most probably contains the needed key-value pair. See IEP-23.
+
+ This method is not a part of the public API. Unless you wish to
+ extend the `pyignite` capabilities (with additional testing, logging,
+ examining connections, et c.) you probably should not use it.
+
+ :param cache: Ignite cache, cache name or cache id,
+ :param key: (optional) pythonic key,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ :return: Ignite connection object.
+ """
+ conn = self.random_node
+
+ if self.partition_aware and key is not None:
+ caches = self._caches_to_update_affinity()
+ if caches:
+ # update partition mapping
+ while True:
+ try:
+ full_affinity = self._get_affinity(conn, caches)
+ break
+ except connection_errors:
+ # retry if connection failed
+ conn = self.random_node
+ pass
+ except CacheError:
+ # server did not create mapping in time
+ return conn
+
+ self._update_affinity(full_affinity)
+
+ for conn in self._nodes:
+ if not conn.alive:
+ conn.reconnect()
+
+ c_id = cache.cache_id if isinstance(cache, BaseCache) else cache_id(cache)
+ parts = self._cache_partition_mapping(c_id).get('number_of_partitions')
+
+ if not parts:
+ return conn
+
+ key, key_hint = self._get_affinity_key(c_id, key, key_hint)
+ hashcode = key_hint.hashcode(key, self)
+
+ best_node = self._get_node_by_hashcode(c_id, hashcode, parts)
+ if best_node:
+ return best_node
+
+ return conn
+
def create_cache(self, settings: Union[str, dict]) -> 'Cache':
"""
Creates Ignite cache by name. Raises `CacheError` if such a cache is
diff --git a/tests/affinity/test_affinity.py b/tests/affinity/test_affinity.py
index b1bcec7..64b9cc5 100644
--- a/tests/affinity/test_affinity.py
+++ b/tests/affinity/test_affinity.py
@@ -309,7 +309,7 @@ def __check_best_node_calculation(client, cache, key, value, key_hint=None):
def inner():
cache.put(key, value, key_hint=key_hint)
- best_node = cache.get_best_node(key, key_hint=key_hint)
+ best_node = client.get_best_node(cache, key, key_hint=key_hint)
for node in filter(lambda n: n.alive, client._nodes):
result = cache_local_peek(node, cache.cache_id, key, key_hint=key_hint)
@@ -318,7 +318,7 @@ def __check_best_node_calculation(client, cache, key, value, key_hint=None):
async def inner_async():
await cache.put(key, value, key_hint=key_hint)
- best_node = await cache.get_best_node(key, key_hint=key_hint)
+ best_node = await client.get_best_node(cache, key, key_hint=key_hint)
for node in filter(lambda n: n.alive, client._nodes):
result = await cache_local_peek_async(node, cache.cache_id, key, key_hint=key_hint)
diff --git a/tests/affinity/test_affinity_request_routing.py b/tests/affinity/test_affinity_request_routing.py
index 64197ff..9c94aa4 100644
--- a/tests/affinity/test_affinity_request_routing.py
+++ b/tests/affinity/test_affinity_request_routing.py
@@ -14,6 +14,7 @@
# limitations under the License.
import asyncio
+import contextlib
from collections import OrderedDict, deque
import random
@@ -28,6 +29,11 @@ from pyignite.datatypes.cache_config import CacheMode
from pyignite.datatypes.prop_codes import PROP_NAME, PROP_BACKUPS_NUMBER, PROP_CACHE_KEY_CONFIGURATION, PROP_CACHE_MODE
from tests.util import wait_for_condition, wait_for_condition_async, start_ignite, kill_process_tree
+try:
+ from contextlib import asynccontextmanager
+except ImportError:
+ from async_generator import asynccontextmanager
+
requests = deque()
old_send = Connection.send
old_send_async = AioConnection._send
@@ -208,25 +214,36 @@ client_routed_connection_string = [('127.0.0.1', 10800 + idx) for idx in range(1
@pytest.fixture
-def client_routed_cache(request):
+def client_routed():
client = Client(partition_aware=True)
try:
client.connect(client_routed_connection_string)
- yield client.get_or_create_cache(request.node.name)
+ yield client
finally:
client.close()
@pytest.fixture
-async def async_client_routed_cache(request):
+def client_routed_cache(client_routed, request):
+ yield client_routed.get_or_create_cache(request.node.name)
+
+
+@pytest.fixture
+async def async_client_routed():
client = AioClient(partition_aware=True)
try:
await client.connect(client_routed_connection_string)
- yield await client.get_or_create_cache(request.node.name)
+ yield client
finally:
await client.close()
+@pytest.fixture
+async def async_client_routed_cache(async_client_routed, request):
+ cache = await async_client_routed.get_or_create_cache(request.node.name)
+ yield cache
+
+
def test_cache_operation_routed_to_new_cluster_node(client_routed_cache):
__perform_cache_operation_routed_to_new_node(client_routed_cache)
@@ -345,3 +362,114 @@ def verify_random_node(cache):
assert idx1 != idx2
return inner_async() if isinstance(cache, AioCache) else inner()
+
+
+@contextlib.contextmanager
+def create_caches(client):
+ caches = []
+ try:
+ caches = [client.create_cache(f'test_cache_{i}') for i in range(0, 10)]
+ yield caches
+ finally:
+ for cache in caches:
+ try:
+ cache.destroy()
+ except: # noqa: 13
+ cache.destroy() # Retry if connection failed.
+ pass
+
+
+@asynccontextmanager
+async def create_caches_async(client):
+ caches = []
+ try:
+ caches = await asyncio.gather(*[client.create_cache(f'test_cache_{i}') for i in range(0, 10)])
+ yield caches
+ finally:
+ for cache in caches:
+ try:
+ await cache.destroy()
+ except: # noqa: 13
+ await cache.destroy() # Retry if connection failed.
+ pass
+
+
+def test_new_registered_cache_affinity(client):
+ with create_caches(client) as caches:
+ key = 12
+ test_cache = random.choice(caches)
+ test_cache.put(key, key)
+ wait_for_affinity_distribution(test_cache, key, 3)
+
+ caches.append(client.create_cache('new_cache'))
+
+ for cache in caches:
+ cache.get(key)
+ assert requests.pop() == 3
+
+
+@pytest.mark.asyncio
+async def test_new_registered_cache_affinity_async(async_client):
+ async with create_caches_async(async_client) as caches:
+ key = 12
+ test_cache = random.choice(caches)
+ test_cache.put(key, key)
+ await wait_for_affinity_distribution_async(test_cache, key, 3)
+
+ caches.append(await async_client.create_cache('new_cache'))
+
+ for cache in caches:
+ await cache.get(key)
+ assert requests.pop() == 3
+
+
+def test_all_registered_cache_updated_on_new_server(client_routed):
+ with create_caches(client_routed) as caches:
+ key = 12
+ test_cache = random.choice(caches)
+ wait_for_affinity_distribution(test_cache, key, 3)
+ test_cache.put(key, key)
+ assert requests.pop() == 3
+
+ srv = start_ignite(idx=4)
+ try:
+ # Wait for rebalance and partition map exchange
+ wait_for_affinity_distribution(test_cache, key, 4)
+
+ for cache in caches:
+ cache.get(key)
+ assert requests.pop() == 4
+ finally:
+ kill_process_tree(srv.pid)
+
+
+@pytest.mark.asyncio
+async def test_all_registered_cache_updated_on_new_server_async(async_client_routed):
+ async with create_caches_async(async_client_routed) as caches:
+ key = 12
+ test_cache = random.choice(caches)
+ await wait_for_affinity_distribution_async(test_cache, key, 3)
+ await test_cache.put(key, key)
+ assert requests.pop() == 3
+
+ srv = start_ignite(idx=4)
+ try:
+ # Wait for rebalance and partition map exchange
+ await wait_for_affinity_distribution_async(test_cache, key, 4)
+
+ for cache in caches:
+ await cache.get(key)
+ assert requests.pop() == 4
+ finally:
+ kill_process_tree(srv.pid)
+
+
+@pytest.mark.asyncio
+async def test_update_affinity_concurrently(async_client):
+ async with create_caches_async(async_client) as caches:
+ key = 12
+ await asyncio.gather(*[cache.put(key, key) for cache in caches])
+
+ for cache in caches:
+ await cache.get(key)
+ assert requests.pop() == 3
diff --git a/tests/common/test_cache_class.py b/tests/common/test_cache_class.py
index 02dfa82..b035d8f 100644
--- a/tests/common/test_cache_class.py
+++ b/tests/common/test_cache_class.py
@@ -36,7 +36,7 @@ def test_cache_create(client):
async def test_cache_create_async(async_client):
cache = await async_client.get_or_create_cache('my_oop_cache')
try:
- assert (await cache.name()) == (await cache.settings())[PROP_NAME] == 'my_oop_cache'
+ assert cache.name == (await cache.settings())[PROP_NAME] == 'my_oop_cache'
finally:
await cache.destroy()
@@ -94,7 +94,7 @@ async def test_cache_config_async(async_client, cache_config):
await async_client.create_cache(cache_config)
cache = await async_client.get_or_create_cache('my_oop_cache')
try:
- assert await cache.name() == cache_config[PROP_NAME]
+ assert cache.name == cache_config[PROP_NAME]
assert (await cache.settings())[PROP_CACHE_KEY_CONFIGURATION] == cache_config[PROP_CACHE_KEY_CONFIGURATION]
finally:
await cache.destroy()
diff --git a/tests/common/test_cache_size.py b/tests/common/test_cache_size.py
index d134903..f2ec3ed 100644
--- a/tests/common/test_cache_size.py
+++ b/tests/common/test_cache_size.py
@@ -15,8 +15,11 @@
import pytest
+from pyignite.datatypes.cache_config import WriteSynchronizationMode
from pyignite.datatypes.key_value import PeekModes
-from pyignite.datatypes.prop_codes import PROP_NAME, PROP_IS_ONHEAP_CACHE_ENABLED, PROP_BACKUPS_NUMBER
+from pyignite.datatypes.prop_codes import (
+ PROP_NAME, PROP_IS_ONHEAP_CACHE_ENABLED, PROP_BACKUPS_NUMBER, PROP_WRITE_SYNCHRONIZATION_MODE
+)
from tests.util import get_or_create_cache, get_or_create_cache_async
test_params = [
@@ -24,7 +27,8 @@ test_params = [
{
PROP_NAME: 'cache_onheap_backups_2',
PROP_IS_ONHEAP_CACHE_ENABLED: True,
- PROP_BACKUPS_NUMBER: 2
+ PROP_BACKUPS_NUMBER: 2,
+ PROP_WRITE_SYNCHRONIZATION_MODE: WriteSynchronizationMode.FULL_SYNC
},
[
[None, 1],
diff --git a/tests/util.py b/tests/util.py
index 064ac7a..5651739 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -26,7 +26,6 @@ import signal
import subprocess
import time
-from pyignite import Client, AioClient
try:
from contextlib import asynccontextmanager