You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2021/06/16 13:33:18 UTC
[ignite-python-thin-client] branch master updated: IGNITE-14911
Unify timeouts,
add support for datetime.timedelta for expiry_policy - Fixes #44.
This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-python-thin-client.git
The following commit(s) were added to refs/heads/master by this push:
new 92a115c IGNITE-14911 Unify timeouts, add support for datetime.timedelta for expiry_policy - Fixes #44.
92a115c is described below
commit 92a115cf450a71d811dc1af684b3ba7fa04a98f2
Author: Ivan Daschinsky <iv...@apache.org>
AuthorDate: Wed Jun 16 16:32:44 2021 +0300
IGNITE-14911 Unify timeouts, add support for datetime.timedelta for expiry_policy - Fixes #44.
---
docs/async_examples.rst | 6 ++--
docs/examples.rst | 6 ++--
examples/expiry_policy.py | 9 +++---
examples/transactions.py | 4 +--
pyignite/aio_client.py | 8 ++---
pyignite/cache.py | 16 +++++-----
pyignite/client.py | 8 ++---
pyignite/datatypes/cache_properties.py | 22 +++++++++++---
pyignite/datatypes/expiry_policy.py | 27 ++++++++++-------
pyignite/transaction.py | 46 +++++++++++++++++++---------
tests/common/test_expiry_policy.py | 55 +++++++++++++++++++++-------------
tests/common/test_transactions.py | 23 ++++++++++++--
12 files changed, 151 insertions(+), 79 deletions(-)
diff --git a/docs/async_examples.rst b/docs/async_examples.rst
index af61a75..4ce65ce 100644
--- a/docs/async_examples.rst
+++ b/docs/async_examples.rst
@@ -63,12 +63,12 @@ in cache settings dictionary on creation.
.. literalinclude:: ../examples/expiry_policy.py
:language: python
:dedent: 12
- :lines: 72-75
+ :lines: 73-76
.. literalinclude:: ../examples/expiry_policy.py
:language: python
:dedent: 12
- :lines: 81-89
+ :lines: 82-90
Secondly, expiry policy can be set for all cache operations, which are done under decorator. To create it use
:py:meth:`~pyignite.cache.BaseCache.with_expire_policy`
@@ -76,7 +76,7 @@ Secondly, expiry policy can be set for all cache operations, which are done unde
.. literalinclude:: ../examples/expiry_policy.py
:language: python
:dedent: 12
- :lines: 96-105
+ :lines: 97-106
Transactions
------------
diff --git a/docs/examples.rst b/docs/examples.rst
index e01f112..4ca0910 100644
--- a/docs/examples.rst
+++ b/docs/examples.rst
@@ -97,12 +97,12 @@ in cache settings dictionary on creation.
.. literalinclude:: ../examples/expiry_policy.py
:language: python
:dedent: 12
- :lines: 31-34
+ :lines: 32-35
.. literalinclude:: ../examples/expiry_policy.py
:language: python
:dedent: 12
- :lines: 40-46
+ :lines: 41-47
Secondly, expiry policy can be set for all cache operations, which are done under decorator. To create it use
:py:meth:`~pyignite.cache.BaseCache.with_expire_policy`
@@ -110,7 +110,7 @@ Secondly, expiry policy can be set for all cache operations, which are done unde
.. literalinclude:: ../examples/expiry_policy.py
:language: python
:dedent: 12
- :lines: 53-60
+ :lines: 54-61
Scan
====
diff --git a/examples/expiry_policy.py b/examples/expiry_policy.py
index 2002da1..3dbe54b 100644
--- a/examples/expiry_policy.py
+++ b/examples/expiry_policy.py
@@ -14,6 +14,7 @@
# limitations under the License.
import asyncio
import time
+from datetime import timedelta
from pyignite import Client, AioClient
from pyignite.datatypes import ExpiryPolicy
@@ -30,7 +31,7 @@ def main():
try:
ttl_cache = client.create_cache({
PROP_NAME: 'test',
- PROP_EXPIRY_POLICY: ExpiryPolicy(create=1.0)
+ PROP_EXPIRY_POLICY: ExpiryPolicy(create=timedelta(seconds=1.0))
})
except NotSupportedByClusterError:
print("'ExpiryPolicy' API is not supported by cluster. Finishing...")
@@ -50,7 +51,7 @@ def main():
print("Create simple Cache and set TTL through `with_expire_policy`")
simple_cache = client.create_cache('test')
try:
- ttl_cache = simple_cache.with_expire_policy(access=1.0)
+ ttl_cache = simple_cache.with_expire_policy(access=timedelta(seconds=1.0))
ttl_cache.put(1, 1)
time.sleep(0.5)
print(f"key = {1}, value = {ttl_cache.get(1)}")
@@ -71,7 +72,7 @@ async def async_main():
try:
ttl_cache = await client.create_cache({
PROP_NAME: 'test',
- PROP_EXPIRY_POLICY: ExpiryPolicy(create=1.0)
+ PROP_EXPIRY_POLICY: ExpiryPolicy(create=timedelta(seconds=1.0))
})
except NotSupportedByClusterError:
print("'ExpiryPolicy' API is not supported by cluster. Finishing...")
@@ -93,7 +94,7 @@ async def async_main():
print("Create simple Cache and set TTL through `with_expire_policy`")
simple_cache = await client.create_cache('test')
try:
- ttl_cache = simple_cache.with_expire_policy(access=1.0)
+ ttl_cache = simple_cache.with_expire_policy(access=timedelta(seconds=1.0))
await ttl_cache.put(1, 1)
await asyncio.sleep(0.5)
value = await ttl_cache.get(1)
diff --git a/examples/transactions.py b/examples/transactions.py
index ef9b08c..53da05f 100644
--- a/examples/transactions.py
+++ b/examples/transactions.py
@@ -62,7 +62,7 @@ async def async_example():
# rollback transaction on timeout.
try:
- async with client.tx_start(timeout=1.0, label='long-tx') as tx:
+ async with client.tx_start(timeout=1000, label='long-tx') as tx:
await cache.put(key, 'fail')
await asyncio.sleep(2.0)
await tx.commit()
@@ -114,7 +114,7 @@ def sync_example():
# rollback transaction on timeout.
try:
- with client.tx_start(timeout=1.0, label='long-tx') as tx:
+ with client.tx_start(timeout=1000, label='long-tx') as tx:
cache.put(key, 'fail')
time.sleep(2.0)
tx.commit()
diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py
index 2bc850b..0bb2b8c 100644
--- a/pyignite/aio_client.py
+++ b/pyignite/aio_client.py
@@ -489,15 +489,15 @@ class AioClient(BaseClient):
def tx_start(self, concurrency: TransactionConcurrency = TransactionConcurrency.PESSIMISTIC,
isolation: TransactionIsolation = TransactionIsolation.REPEATABLE_READ,
- timeout: Union[int, float] = 0, label: Optional[str] = None) -> 'AioTransaction':
+ timeout: int = 0, label: Optional[str] = None) -> 'AioTransaction':
"""
Start async thin client transaction. **Supported only python 3.7+**
:param concurrency: (optional) transaction concurrency, see
- :py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`
+ :py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`,
:param isolation: (optional) transaction isolation level, see
- :py:class:`~pyignite.datatypes.transactions.TransactionIsolation`
- :param timeout: (optional) transaction timeout in seconds if float, in millis if int
+ :py:class:`~pyignite.datatypes.transactions.TransactionIsolation`,
+ :param timeout: (optional) transaction timeout in milliseconds,
:param label: (optional) transaction label.
:return: :py:class:`~pyignite.transaction.AioTransaction` instance.
"""
diff --git a/pyignite/cache.py b/pyignite/cache.py
index 79fa0f5..51f07c9 100644
--- a/pyignite/cache.py
+++ b/pyignite/cache.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import datetime
from typing import Any, Iterable, Optional, Tuple, Union
from .api.tx_api import get_tx_connection
@@ -136,16 +136,16 @@ class BaseCache:
def with_expire_policy(
self, expiry_policy: Optional[ExpiryPolicy] = None,
- create: Union[int, float] = ExpiryPolicy.UNCHANGED,
- update: Union[int, float] = ExpiryPolicy.UNCHANGED,
- access: Union[int, float] = ExpiryPolicy.UNCHANGED
+ create: Union[int, datetime.timedelta] = ExpiryPolicy.UNCHANGED,
+ update: Union[int, datetime.timedelta] = ExpiryPolicy.UNCHANGED,
+ access: Union[int, datetime.timedelta] = ExpiryPolicy.UNCHANGED
):
"""
:param expiry_policy: optional :class:`~pyignite.datatypes.expiry_policy.ExpiryPolicy`
- object. If it is set, other params will be ignored.
- :param create: create TTL in seconds (float) or milliseconds (int),
- :param update: Create TTL in seconds (float) or milliseconds (int),
- :param access: Create TTL in seconds (float) or milliseconds (int).
+ object. If it is set, other params will be ignored,
+ :param create: TTL for create in milliseconds or :py:class:`~time.timedelta`,
+ :param update: TTL for update in milliseconds or :py:class:`~time.timedelta`,
+ :param access: TTL for access in milliseconds or :py:class:`~time.timedelta`,
:return: cache decorator with expiry policy set.
"""
if not self.client.protocol_context.is_expiry_policy_supported():
diff --git a/pyignite/client.py b/pyignite/client.py
index f848bcc..6a499a3 100644
--- a/pyignite/client.py
+++ b/pyignite/client.py
@@ -744,15 +744,15 @@ class Client(BaseClient):
def tx_start(self, concurrency: TransactionConcurrency = TransactionConcurrency.PESSIMISTIC,
isolation: TransactionIsolation = TransactionIsolation.REPEATABLE_READ,
- timeout: Union[int, float] = 0, label: Optional[str] = None) -> 'Transaction':
+ timeout: int = 0, label: Optional[str] = None) -> 'Transaction':
"""
Start thin client transaction.
:param concurrency: (optional) transaction concurrency, see
- :py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`
+ :py:class:`~pyignite.datatypes.transactions.TransactionConcurrency`,
:param isolation: (optional) transaction isolation level, see
- :py:class:`~pyignite.datatypes.transactions.TransactionIsolation`
- :param timeout: (optional) transaction timeout in seconds if float, in millis if int
+ :py:class:`~pyignite.datatypes.transactions.TransactionIsolation`,
+ :param timeout: (optional) transaction timeout in milliseconds,
:param label: (optional) transaction label.
:return: :py:class:`~pyignite.transaction.Transaction` instance.
"""
diff --git a/pyignite/datatypes/cache_properties.py b/pyignite/datatypes/cache_properties.py
index 49327a3..0d7f402 100644
--- a/pyignite/datatypes/cache_properties.py
+++ b/pyignite/datatypes/cache_properties.py
@@ -14,6 +14,8 @@
# limitations under the License.
import ctypes
+import math
+from typing import Union
from . import ExpiryPolicy
from .prop_codes import *
@@ -137,6 +139,20 @@ class PropBase:
return cls.from_python(stream, value)
+class TimeoutProp(PropBase):
+ prop_data_class = Long
+
+ @classmethod
+ def from_python(cls, stream, value: int):
+ if not isinstance(value, int) or value < 0:
+ raise ValueError(f'Timeout value should be a positive integer, {value} passed instead')
+ return super().from_python(stream, value)
+
+ @classmethod
+ async def from_python_async(cls, stream, value):
+ return cls.from_python(stream, value)
+
+
class PropName(PropBase):
prop_code = PROP_NAME
prop_data_class = String
@@ -227,9 +243,8 @@ class PropRebalanceDelay(PropBase):
prop_data_class = Long
-class PropRebalanceTimeout(PropBase):
+class PropRebalanceTimeout(TimeoutProp):
prop_code = PROP_REBALANCE_TIMEOUT
- prop_data_class = Long
class PropRebalanceBatchSize(PropBase):
@@ -262,9 +277,8 @@ class PropCacheKeyConfiguration(PropBase):
prop_data_class = CacheKeyConfiguration
-class PropDefaultLockTimeout(PropBase):
+class PropDefaultLockTimeout(TimeoutProp):
prop_code = PROP_DEFAULT_LOCK_TIMEOUT
- prop_data_class = Long
class PropMaxConcurrentAsyncOperation(PropBase):
diff --git a/pyignite/datatypes/expiry_policy.py b/pyignite/datatypes/expiry_policy.py
index d729da5..95e37db 100644
--- a/pyignite/datatypes/expiry_policy.py
+++ b/pyignite/datatypes/expiry_policy.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import ctypes
+import math
+from datetime import timedelta
from io import SEEK_CUR
from typing import Union
@@ -22,13 +24,16 @@ from pyignite.constants import PROTOCOL_BYTE_ORDER
def _positive(_, attrib, value):
+ if isinstance(value, timedelta):
+ value = value.total_seconds() * 1000
+
if value < 0 and value not in [ExpiryPolicy.UNCHANGED, ExpiryPolicy.ETERNAL]:
raise ValueError(f"'{attrib.name}' value must not be negative")
def _write_duration(stream, value):
- if isinstance(value, float):
- value = int(value * 1000)
+ if isinstance(value, timedelta):
+ value = math.floor(value.total_seconds() * 1000)
stream.write(value.to_bytes(8, byteorder=PROTOCOL_BYTE_ORDER, signed=True))
@@ -44,17 +49,17 @@ class ExpiryPolicy:
#: Set TTL eternal.
ETERNAL = -1
- #: Set TTL for create in seconds(float) or millis(int)
- create = attr.ib(kw_only=True, default=UNCHANGED,
- validator=[attr.validators.instance_of((int, float)), _positive])
+ #: Set TTL for create in milliseconds or :py:class:`~time.timedelta`
+ create = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta],
+ validator=[attr.validators.instance_of((int, timedelta)), _positive])
- #: Set TTL for update in seconds(float) or millis(int)
- update = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, float],
- validator=[attr.validators.instance_of((int, float)), _positive])
+ #: Set TTL for update in milliseconds or :py:class:`~time.timedelta`
+ update = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta],
+ validator=[attr.validators.instance_of((int, timedelta)), _positive])
- #: Set TTL for access in seconds(float) or millis(int)
- access = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, float],
- validator=[attr.validators.instance_of((int, float)), _positive])
+ #: Set TTL for access in milliseconds or :py:class:`~time.timedelta`
+ access = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta],
+ validator=[attr.validators.instance_of((int, timedelta)), _positive])
class _CType(ctypes.LittleEndianStructure):
_pack_ = 1
diff --git a/pyignite/transaction.py b/pyignite/transaction.py
index 5bafa6b..eb77f8d 100644
--- a/pyignite/transaction.py
+++ b/pyignite/transaction.py
@@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import math
-from typing import Union
+from enum import IntEnum
+from typing import Union, Type
from pyignite.api.tx_api import tx_end, tx_start, tx_end_async, tx_start_async
from pyignite.datatypes import TransactionIsolation, TransactionConcurrency
@@ -22,21 +22,41 @@ from pyignite.exceptions import CacheError
from pyignite.utils import status_to_exception
-def _convert_to_millis(timeout: Union[int, float]) -> int:
- if isinstance(timeout, float):
- return math.floor(timeout * 1000)
- return timeout
+def _validate_int_enum_param(value: Union[int, IntEnum], cls: Type[IntEnum]):
+ if value not in cls:
+ raise ValueError(f'{value} not in {cls}')
+ return value
-class Transaction:
+def _validate_timeout(value):
+ if not isinstance(value, int) or value < 0:
+ raise ValueError(f'Timeout value should be a positive integer, {value} passed instead')
+ return value
+
+
+def _validate_label(value):
+ if value and not isinstance(value, str):
+ raise ValueError(f'Label should be str, {type(value)} passed instead')
+ return value
+
+
+class _BaseTransaction:
+ def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC,
+ isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None):
+ self.client = client
+ self.concurrency = _validate_int_enum_param(concurrency, TransactionConcurrency)
+ self.isolation = _validate_int_enum_param(isolation, TransactionIsolation)
+ self.timeout = _validate_timeout(timeout)
+ self.label, self.closed = _validate_label(label), False
+
+
+class Transaction(_BaseTransaction):
"""
Thin client transaction.
"""
def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC,
isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None):
- self.client, self.concurrency = client, concurrency
- self.isolation, self.timeout = isolation, _convert_to_millis(timeout)
- self.label, self.closed = label, False
+ super().__init__(client, concurrency, isolation, timeout, label)
self.tx_id = self.__start_tx()
def commit(self) -> None:
@@ -77,15 +97,13 @@ class Transaction:
return tx_end(self.tx_id, committed)
-class AioTransaction:
+class AioTransaction(_BaseTransaction):
"""
Async thin client transaction.
"""
def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC,
isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None):
- self.client, self.concurrency = client, concurrency
- self.isolation, self.timeout = isolation, _convert_to_millis(timeout)
- self.label, self.closed = label, False
+ super().__init__(client, concurrency, isolation, timeout, label)
def __await__(self):
return (yield from self.__aenter__().__await__())
diff --git a/tests/common/test_expiry_policy.py b/tests/common/test_expiry_policy.py
index 9dc4152..939a380 100644
--- a/tests/common/test_expiry_policy.py
+++ b/tests/common/test_expiry_policy.py
@@ -14,6 +14,7 @@
# limitations under the License.
import asyncio
import time
+from datetime import timedelta
import pytest
@@ -23,11 +24,11 @@ from pyignite.datatypes.prop_codes import PROP_NAME, PROP_EXPIRY_POLICY
@pytest.mark.skip_if_no_expiry_policy
def test_expiry_policy(cache):
- ttl, num_retries = 0.6, 10
+ ttl, num_retries = timedelta(seconds=0.6), 10
cache_eternal = cache.with_expire_policy(create=ExpiryPolicy.ETERNAL)
- cache_created = cache.with_expire_policy(create=0.6)
- cache_updated = cache.with_expire_policy(update=0.6)
- cache_accessed = cache.with_expire_policy(access=0.6)
+ cache_created = cache.with_expire_policy(create=ttl)
+ cache_updated = cache.with_expire_policy(update=ttl)
+ cache_accessed = cache.with_expire_policy(access=ttl)
for _ in range(num_retries):
cache.clear()
@@ -39,11 +40,11 @@ def test_expiry_policy(cache):
cache_updated.put(2, 2)
cache_accessed.put(3, 3)
- time.sleep(ttl * 2 / 3)
+ time.sleep(ttl.total_seconds() * 2 / 3)
result = [cache.contains_key(k) for k in range(4)]
- if time.time() - start >= ttl:
+ if time.time() - start >= ttl.total_seconds():
continue
assert all(result)
@@ -55,20 +56,20 @@ def test_expiry_policy(cache):
cache_updated.put(2, 3) # Check that update policy works.
cache_accessed.get(3) # Check that access policy works.
- time.sleep(ttl * 2 / 3)
+ time.sleep(ttl.total_seconds() * 2 / 3)
result = [cache.contains_key(k) for k in range(4)]
- if time.time() - start >= ttl:
+ if time.time() - start >= ttl.total_seconds():
continue
assert result == [True, False, True, True]
- time.sleep(ttl * 2 / 3)
+ time.sleep(ttl.total_seconds() * 2 / 3)
cache_updated.get(2) # Check that access doesn't matter for updated policy.
- time.sleep(ttl * 2 / 3)
+ time.sleep(ttl.total_seconds() * 2 / 3)
result = [cache.contains_key(k) for k in range(0, 4)]
assert result == [True, False, False, False]
@@ -77,11 +78,11 @@ def test_expiry_policy(cache):
@pytest.mark.asyncio
@pytest.mark.skip_if_no_expiry_policy
async def test_expiry_policy_async(async_cache):
- ttl, num_retries = 0.6, 10
+ ttl, num_retries = timedelta(seconds=0.6), 10
cache_eternal = async_cache.with_expire_policy(create=ExpiryPolicy.ETERNAL)
- cache_created = async_cache.with_expire_policy(create=0.6)
- cache_updated = async_cache.with_expire_policy(update=0.6)
- cache_accessed = async_cache.with_expire_policy(access=0.6)
+ cache_created = async_cache.with_expire_policy(create=ttl)
+ cache_updated = async_cache.with_expire_policy(update=ttl)
+ cache_accessed = async_cache.with_expire_policy(access=ttl)
for _ in range(num_retries):
await async_cache.clear()
@@ -95,11 +96,11 @@ async def test_expiry_policy_async(async_cache):
cache_accessed.put(3, 3)
)
- await asyncio.sleep(ttl * 2 / 3)
+ await asyncio.sleep(ttl.total_seconds() * 2 / 3)
result = await asyncio.gather(*[async_cache.contains_key(k) for k in range(4)])
- if time.time() - start >= ttl:
+ if time.time() - start >= ttl.total_seconds():
continue
assert all(result)
@@ -113,20 +114,20 @@ async def test_expiry_policy_async(async_cache):
cache_accessed.get(3) # Check that access policy works.
)
- await asyncio.sleep(ttl * 2 / 3)
+ await asyncio.sleep(ttl.total_seconds() * 2 / 3)
result = await asyncio.gather(*[async_cache.contains_key(k) for k in range(4)])
- if time.time() - start >= ttl:
+ if time.time() - start >= ttl.total_seconds():
continue
assert result == [True, False, True, True]
- await asyncio.sleep(ttl * 2 / 3)
+ await asyncio.sleep(ttl.total_seconds() * 2 / 3)
await cache_updated.get(2) # Check that access doesn't matter for updated policy.
- await asyncio.sleep(ttl * 2 / 3)
+ await asyncio.sleep(ttl.total_seconds() * 2 / 3)
result = await asyncio.gather(*[async_cache.contains_key(k) for k in range(4)])
assert result == [True, False, False, False]
@@ -169,3 +170,17 @@ async def test_create_cache_with_expiry_policy_async(async_client, expiry_policy
assert settings[PROP_EXPIRY_POLICY] == expiry_policy
finally:
await cache.destroy()
+
+
+@pytest.mark.skip_if_no_expiry_policy
+@pytest.mark.parametrize(
+ 'params',
+ [
+ {'create': timedelta(seconds=-1), 'update': timedelta(seconds=-1), 'delete': timedelta(seconds=-1)},
+ {'create': 0.6},
+ {'create': -3}
+ ]
+)
+def test_expiry_policy_param_validation(params):
+ with pytest.raises((TypeError, ValueError)):
+ ExpiryPolicy(**params)
diff --git a/tests/common/test_transactions.py b/tests/common/test_transactions.py
index 57874b6..e879f60 100644
--- a/tests/common/test_transactions.py
+++ b/tests/common/test_transactions.py
@@ -25,6 +25,7 @@ from pyignite.datatypes import TransactionIsolation, TransactionConcurrency
from pyignite.datatypes.cache_config import CacheAtomicityMode
from pyignite.datatypes.prop_codes import PROP_NAME, PROP_CACHE_ATOMICITY_MODE
from pyignite.exceptions import CacheError
+from pyignite.transaction import Transaction, AioTransaction
@pytest.fixture
@@ -137,7 +138,7 @@ async def test_simple_transaction_async(async_client, async_tx_cache, iso_level,
def test_transactions_timeout(client, tx_cache):
- with client.tx_start(timeout=2.0, label='tx-sync') as tx:
+ with client.tx_start(timeout=2000, label='tx-sync') as tx:
tx_cache.put(1, 1)
time.sleep(3.0)
with pytest.raises(CacheError) as to_error:
@@ -160,7 +161,7 @@ async def test_transactions_timeout_async(async_client, async_tx_cache):
await tx.commit()
- task = asyncio.gather(*[update(i, 2.0) for i in range(20)], return_exceptions=True)
+ task = asyncio.gather(*[update(i, 2000) for i in range(20)], return_exceptions=True)
await asyncio.sleep(5.0)
assert task.done() # Check that all transactions completed or rolled-back on timeout
for i, ex in enumerate(task.result()):
@@ -231,3 +232,21 @@ async def test_concurrent_transactions(async_client, async_tx_cache, iso_level,
await asyncio.gather(*[update(i) for i in range(20)], return_exceptions=True)
assert await async_tx_cache.get_all(list(range(20))) == {i: f'test-{i}' for i in range(20) if i % 2 == 0}
+
+
+@pytest.mark.parametrize(
+ "params",
+ [
+ {'isolation': 25},
+ {'concurrency': 45},
+ {'timeout': 2.0},
+ {'timeout': -10},
+ {'label': 100500}
+ ]
+)
+def test_tx_parameter_validation(params):
+ with pytest.raises((TypeError, ValueError)):
+ Transaction(None, **params)
+
+ with pytest.raises((TypeError, ValueError)):
+ AioTransaction(None, **params)