You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2021/04/26 12:54:04 UTC
[ignite-python-thin-client] 01/01: IGNITE-14595 Implement
ExpiryPolicy support - Fixes #35.
This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-python-thin-client.git
commit 9f72781795e6fa1f427e559426ab931a49772716
Author: Ivan Daschinsky <iv...@apache.org>
AuthorDate: Fri Apr 23 10:45:03 2021 +0300
IGNITE-14595 Implement ExpiryPolicy support - Fixes #35.
---
README.md | 4 +-
docs/async_examples.rst | 31 +-
docs/datatypes/cache_props.rst | 152 +++---
docs/examples.rst | 32 +-
.../{pyignite.rst => pyignite.aio_cluster.rst} | 29 +-
docs/source/{pyignite.rst => pyignite.cluster.rst} | 29 +-
...te.rst => pyignite.datatypes.cluster_state.rst} | 30 +-
...te.rst => pyignite.datatypes.expiry_policy.rst} | 30 +-
docs/source/pyignite.datatypes.rst | 3 +-
docs/source/pyignite.rst | 3 +-
examples/expiry_policy.py | 113 +++++
examples/failover.py | 45 +-
pyignite/aio_cache.py | 57 +--
pyignite/aio_client.py | 17 +-
pyignite/aio_cluster.py | 20 +-
pyignite/api/cache_config.py | 39 +-
pyignite/api/key_value.py | 544 +++++++++------------
pyignite/api/sql.py | 84 ++--
pyignite/cache.py | 107 ++--
pyignite/client.py | 21 +-
pyignite/cluster.py | 20 +-
pyignite/connection/protocol_context.py | 3 +
pyignite/cursors.py | 58 +--
pyignite/datatypes/__init__.py | 2 +
pyignite/datatypes/cache_config.py | 74 +--
pyignite/datatypes/cache_properties.py | 9 +-
pyignite/datatypes/cluster_state.py | 4 +
pyignite/datatypes/expiry_policy.py | 110 +++++
pyignite/datatypes/prop_codes.py | 1 +
pyignite/queries/query.py | 33 +-
tests/affinity/conftest.py | 7 +-
tests/affinity/test_affinity.py | 4 +-
tests/common/conftest.py | 11 +
tests/common/test_binary.py | 4 +-
tests/common/test_cache_config.py | 13 +-
tests/common/test_expiry_policy.py | 171 +++++++
tests/conftest.py | 1 +
tests/custom/test_cluster.py | 11 +-
38 files changed, 1161 insertions(+), 765 deletions(-)
diff --git a/README.md b/README.md
index 7a23147..8e009de 100644
--- a/README.md
+++ b/README.md
@@ -3,9 +3,9 @@ Apache Ignite thin (binary protocol) client, written in Python 3.
## Prerequisites
-- Python 3.4 or above (3.6, 3.7 and 3.8 are tested),
+- Python 3.6 or above (3.6, 3.7, 3.8 and 3.9 are tested),
- Access to Apache Ignite node, local or remote. The current thin client
- version was tested on Apache Ignite 2.7.0 (binary client protocol 1.2.0).
+ version was tested on Apache Ignite 2.10 (binary client protocol 1.7.0).
## Installation
diff --git a/docs/async_examples.rst b/docs/async_examples.rst
index 363599a..4bc21ae 100644
--- a/docs/async_examples.rst
+++ b/docs/async_examples.rst
@@ -48,14 +48,39 @@ that yields the resulting rows.
.. literalinclude:: ../examples/async_key_value.py
:language: python
- :dedent: 4
+ :dedent: 8
:lines: 39-50
+ExpiryPolicy
+============
+File: `expiry_policy.py`_.
-File: `async_sql.py`_.
+You can enable expiry policy (TTL) by two approaches.
+
+Firstly, expiry policy can be set for entire cache by setting :py:attr:`~pyignite.datatypes.prop_codes.PROP_EXPIRY_POLICY`
+in cache settings dictionary on creation.
+
+.. literalinclude:: ../examples/expiry_policy.py
+ :language: python
+ :dedent: 12
+ :lines: 72-75
+
+.. literalinclude:: ../examples/expiry_policy.py
+ :language: python
+ :dedent: 12
+ :lines: 81-89
+
+Secondly, expiry policy can be set for all cache operations, which are done under decorator. To create it use
+:py:meth:`~pyignite.cache.BaseCache.with_expire_policy`
+
+.. literalinclude:: ../examples/expiry_policy.py
+ :language: python
+ :dedent: 12
+ :lines: 96-105
SQL
---
+File: `async_sql.py`_.
First let us establish a connection.
@@ -146,6 +171,6 @@ Finally, delete the tables used in this example with the following queries:
-
+.. _expiry_policy.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/expiry_policy.py
.. _async_key_value.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/async_key_value.py
.. _async_sql.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/async_sql.py
\ No newline at end of file
diff --git a/docs/datatypes/cache_props.rst b/docs/datatypes/cache_props.rst
index 3cabbe6..380ccf2 100644
--- a/docs/datatypes/cache_props.rst
+++ b/docs/datatypes/cache_props.rst
@@ -26,78 +26,80 @@ Please refer to the `Apache Ignite Data Grid`_ documentation on cache
synchronization, rebalance, affinity and other cache configuration-related
matters.
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| Property | Ordinal | Property | Description |
-| name | value | type | |
-+=======================================+==========+==========+=======================================================+
-| Read/write cache properties, used to configure cache via :py:meth:`~pyignite.client.Client.create_cache` or |
-| :py:meth:`~pyignite.client.Client.get_or_create_cache` of :py:class:`~pyignite.client.Client` |
-| (:py:meth:`~pyignite.aio_client.AioClient.create_cache` or |
-| :py:meth:`~pyignite.aio_client.AioClient.get_or_create_cache` of :py:class:`~pyignite.aio_client.AioClient`). |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_NAME | 0 | str | Cache name. This is the only *required* property. |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_CACHE_MODE | 1 | int | Cache mode: LOCAL=0, REPLICATED=1, PARTITIONED=2 |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_CACHE_ATOMICITY_MODE | 2 | int | Cache atomicity mode: TRANSACTIONAL=0, ATOMIC=1 |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_BACKUPS_NUMBER | 3 | int | Number of backups |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_WRITE_SYNCHRONIZATION_MODE | 4 | int | Write synchronization mode: FULL_SYNC=0, |
-| | | | FULL_ASYNC=1, PRIMARY_SYNC=2 |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_COPY_ON_READ | 5 | bool | Copy-on-read |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_READ_FROM_BACKUP | 6 | bool | Read from backup |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_DATA_REGION_NAME | 100 | str | Data region name |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_IS_ONHEAP_CACHE_ENABLED | 101 | bool | Is OnHeap cache enabled? |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_QUERY_ENTITIES | 200 | list | A list of query entities (see `Query entity`_) |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_QUERY_PARALLELISM | 201 | int | Query parallelism |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_QUERY_DETAIL_METRIC_SIZE | 202 | int | Query detail metric size |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_SQL_SCHEMA | 203 | str | SQL schema |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_SQL_INDEX_INLINE_MAX_SIZE | 204 | int | SQL index inline maximum size |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_SQL_ESCAPE_ALL | 205 | bool | Turns on SQL escapes |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_MAX_QUERY_ITERATORS | 206 | int | Maximum number of query iterators |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_REBALANCE_MODE | 300 | int | Rebalance mode: SYNC=0, ASYNC=1, NONE=2 |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_REBALANCE_DELAY | 301 | int | Rebalance delay (ms) |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_REBALANCE_TIMEOUT | 302 | int | Rebalance timeout (ms) |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_REBALANCE_BATCH_SIZE | 303 | int | Rebalance batch size |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_REBALANCE_BATCHES_PREFETCH_COUNT | 304 | int | Rebalance batches prefetch count |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_REBALANCE_ORDER | 305 | int | Rebalance order |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_REBALANCE_THROTTLE | 306 | int | Rebalance throttle (ms) |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_GROUP_NAME | 400 | str | Group name |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_CACHE_KEY_CONFIGURATION | 401 | list | Cache key configuration (see `Cache key`_) |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_DEFAULT_LOCK_TIMEOUT | 402 | int | Default lock timeout (ms) |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_MAX_CONCURRENT_ASYNC_OPERATIONS | 403 | int | Maximum number of concurrent asynchronous operations |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_PARTITION_LOSS_POLICY | 404 | int | Partition loss policy: READ_ONLY_SAFE=0, |
-| | | | READ_ONLY_ALL=1, READ_WRITE_SAFE=2, READ_WRITE_ALL=3, |
-| | | | IGNORE=4 |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_EAGER_TTL | 405 | bool | Eager TTL |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
-| PROP_STATISTICS_ENABLED | 406 | bool | Statistics enabled |
-+---------------------------------------+----------+----------+-------------------------------------------------------+
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| Property | Ordinal | Property | Description |
+| name | value | type | |
++=======================================+==========+============================================================+=======================================================+
+| Read/write cache properties, used to configure cache via :py:meth:`~pyignite.client.Client.create_cache` or |
+| :py:meth:`~pyignite.client.Client.get_or_create_cache` of :py:class:`~pyignite.client.Client` |
+| (:py:meth:`~pyignite.aio_client.AioClient.create_cache` or |
+| :py:meth:`~pyignite.aio_client.AioClient.get_or_create_cache` of :py:class:`~pyignite.aio_client.AioClient`). |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_NAME | 0 | str | Cache name. This is the only *required* property. |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_CACHE_MODE | 1 | int | Cache mode: LOCAL=0, REPLICATED=1, PARTITIONED=2 |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_CACHE_ATOMICITY_MODE | 2 | int | Cache atomicity mode: TRANSACTIONAL=0, ATOMIC=1 |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_BACKUPS_NUMBER | 3 | int | Number of backups |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_WRITE_SYNCHRONIZATION_MODE | 4 | int | Write synchronization mode: FULL_SYNC=0, |
+| | | | FULL_ASYNC=1, PRIMARY_SYNC=2 |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_COPY_ON_READ | 5 | bool | Copy-on-read |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_READ_FROM_BACKUP | 6 | bool | Read from backup |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_DATA_REGION_NAME | 100 | str | Data region name |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_IS_ONHEAP_CACHE_ENABLED | 101 | bool | Is OnHeap cache enabled? |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_QUERY_ENTITIES | 200 | list | A list of query entities (see `Query entity`_) |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_QUERY_PARALLELISM | 201 | int | Query parallelism |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_QUERY_DETAIL_METRIC_SIZE | 202 | int | Query detail metric size |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_SQL_SCHEMA | 203 | str | SQL schema |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_SQL_INDEX_INLINE_MAX_SIZE | 204 | int | SQL index inline maximum size |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_SQL_ESCAPE_ALL | 205 | bool | Turns on SQL escapes |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_MAX_QUERY_ITERATORS | 206 | int | Maximum number of query iterators |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_REBALANCE_MODE | 300 | int | Rebalance mode: SYNC=0, ASYNC=1, NONE=2 |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_REBALANCE_DELAY | 301 | int | Rebalance delay (ms) |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_REBALANCE_TIMEOUT | 302 | int | Rebalance timeout (ms) |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_REBALANCE_BATCH_SIZE | 303 | int | Rebalance batch size |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_REBALANCE_BATCHES_PREFETCH_COUNT | 304 | int | Rebalance batches prefetch count |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_REBALANCE_ORDER | 305 | int | Rebalance order |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_REBALANCE_THROTTLE | 306 | int | Rebalance throttle (ms) |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_GROUP_NAME | 400 | str | Group name |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_CACHE_KEY_CONFIGURATION | 401 | list | Cache key configuration (see `Cache key`_) |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_DEFAULT_LOCK_TIMEOUT | 402 | int | Default lock timeout (ms) |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_MAX_CONCURRENT_ASYNC_OPERATIONS | 403 | int | Maximum number of concurrent asynchronous operations |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_PARTITION_LOSS_POLICY | 404 | int | Partition loss policy: READ_ONLY_SAFE=0, |
+| | | | READ_ONLY_ALL=1, READ_WRITE_SAFE=2, READ_WRITE_ALL=3, |
+| | | | IGNORE=4 |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_EAGER_TTL | 405 | bool | Eager TTL |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_STATISTICS_ENABLED | 406 | bool | Statistics enabled |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
+| PROP_EXPIRY_POLICY | 407 | :py:class:`~pyignite.datatypes.expiry_policy.ExpiryPolicy` | Set expiry policy (see `Expiry policy`_) |
++---------------------------------------+----------+------------------------------------------------------------+-------------------------------------------------------+
Query entity
------------
@@ -159,3 +161,9 @@ A dict of the following format:
- `affinity_key_field_name`: name of the affinity key field.
.. _Apache Ignite Data Grid: https://apacheignite.readme.io/docs/data-grid
+
+Expiry policy
+-------------
+
+Set expiry policy to cache (see :py:class:`~pyignite.datatypes.expiry_policy.ExpiryPolicy`). If set to `None`,
+expiry policy will not be set.
diff --git a/docs/examples.rst b/docs/examples.rst
index 0379330..07ec65c 100644
--- a/docs/examples.rst
+++ b/docs/examples.rst
@@ -85,6 +85,33 @@ As a rule of thumb:
Refer the :ref:`data_types` section for the full list
of parser/constructor classes you can use as type hints.
+ExpiryPolicy
+============
+File: `expiry_policy.py`_.
+
+You can enable expiry policy (TTL) by two approaches.
+
+Firstly, expiry policy can be set for entire cache by setting :py:attr:`~pyignite.datatypes.prop_codes.PROP_EXPIRY_POLICY`
+in cache settings dictionary on creation.
+
+.. literalinclude:: ../examples/expiry_policy.py
+ :language: python
+ :dedent: 12
+ :lines: 31-34
+
+.. literalinclude:: ../examples/expiry_policy.py
+ :language: python
+ :dedent: 12
+ :lines: 40-46
+
+Secondly, expiry policy can be set for all cache operations, which are done under decorator. To create it use
+:py:meth:`~pyignite.cache.BaseCache.with_expire_policy`
+
+.. literalinclude:: ../examples/expiry_policy.py
+ :language: python
+ :dedent: 12
+ :lines: 53-60
+
Scan
====
File: `scans.py`_.
@@ -558,13 +585,13 @@ Gather 3 Ignite nodes on `localhost` into one cluster and run:
.. literalinclude:: ../examples/failover.py
:language: python
- :lines: 16-53
+ :lines: 16-52
Then try shutting down and restarting nodes, and see what happens.
.. literalinclude:: ../examples/failover.py
:language: python
- :lines: 55-67
+ :lines: 54-66
Client reconnection do not require an explicit user action, like calling
a special method or resetting a parameter.
@@ -683,6 +710,7 @@ with the following message:
.. _type_hints.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/type_hints.py
.. _failover.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/failover.py
.. _scans.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/scans.py
+.. _expiry_policy.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/expiry_policy.py
.. _sql.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/sql.py
.. _async_sql.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/async_sql.py
.. _binary_basics.py: https://github.com/apache/ignite-python-thin-client/blob/master/examples/binary_basics.py
diff --git a/docs/source/pyignite.rst b/docs/source/pyignite.aio_cluster.rst
similarity index 70%
copy from docs/source/pyignite.rst
copy to docs/source/pyignite.aio_cluster.rst
index 85e31a8..ee2fa1b 100644
--- a/docs/source/pyignite.rst
+++ b/docs/source/pyignite.aio_cluster.rst
@@ -13,33 +13,10 @@
See the License for the specific language governing permissions and
limitations under the License.
-pyignite package
-================
+pyignite.aio_cluster module
+===========================
-.. automodule:: pyignite
+.. automodule:: pyignite.aio_cluster
:members:
:undoc-members:
:show-inheritance:
-
-Subpackages
------------
-
-.. toctree::
-
- pyignite.datatypes
- pyignite.connection
-
-Submodules
-----------
-
-.. toctree::
-
- pyignite.binary
- pyignite.cache
- pyignite.aio_cache
- pyignite.client
- pyignite.aio_client
- pyignite.constants
- pyignite.cursors
- pyignite.exceptions
-
diff --git a/docs/source/pyignite.rst b/docs/source/pyignite.cluster.rst
similarity index 70%
copy from docs/source/pyignite.rst
copy to docs/source/pyignite.cluster.rst
index 85e31a8..cacdfb7 100644
--- a/docs/source/pyignite.rst
+++ b/docs/source/pyignite.cluster.rst
@@ -13,33 +13,10 @@
See the License for the specific language governing permissions and
limitations under the License.
-pyignite package
-================
+pyignite.cluster module
+=======================
-.. automodule:: pyignite
+.. automodule:: pyignite.cluster
:members:
:undoc-members:
:show-inheritance:
-
-Subpackages
------------
-
-.. toctree::
-
- pyignite.datatypes
- pyignite.connection
-
-Submodules
-----------
-
-.. toctree::
-
- pyignite.binary
- pyignite.cache
- pyignite.aio_cache
- pyignite.client
- pyignite.aio_client
- pyignite.constants
- pyignite.cursors
- pyignite.exceptions
-
diff --git a/docs/source/pyignite.rst b/docs/source/pyignite.datatypes.cluster_state.rst
similarity index 68%
copy from docs/source/pyignite.rst
copy to docs/source/pyignite.datatypes.cluster_state.rst
index 85e31a8..a1d7663 100644
--- a/docs/source/pyignite.rst
+++ b/docs/source/pyignite.datatypes.cluster_state.rst
@@ -13,33 +13,9 @@
See the License for the specific language governing permissions and
limitations under the License.
-pyignite package
-================
+pyignite.datatypes.cluster_state module
+=======================================
-.. automodule:: pyignite
+.. automodule:: pyignite.datatypes.cluster_state
:members:
- :undoc-members:
:show-inheritance:
-
-Subpackages
------------
-
-.. toctree::
-
- pyignite.datatypes
- pyignite.connection
-
-Submodules
-----------
-
-.. toctree::
-
- pyignite.binary
- pyignite.cache
- pyignite.aio_cache
- pyignite.client
- pyignite.aio_client
- pyignite.constants
- pyignite.cursors
- pyignite.exceptions
-
diff --git a/docs/source/pyignite.rst b/docs/source/pyignite.datatypes.expiry_policy.rst
similarity index 68%
copy from docs/source/pyignite.rst
copy to docs/source/pyignite.datatypes.expiry_policy.rst
index 85e31a8..87d651e 100644
--- a/docs/source/pyignite.rst
+++ b/docs/source/pyignite.datatypes.expiry_policy.rst
@@ -13,33 +13,9 @@
See the License for the specific language governing permissions and
limitations under the License.
-pyignite package
-================
+pyignite.datatypes.expiry_policy module
+=======================================
-.. automodule:: pyignite
+.. automodule:: pyignite.datatypes.expiry_policy
:members:
- :undoc-members:
:show-inheritance:
-
-Subpackages
------------
-
-.. toctree::
-
- pyignite.datatypes
- pyignite.connection
-
-Submodules
-----------
-
-.. toctree::
-
- pyignite.binary
- pyignite.cache
- pyignite.aio_cache
- pyignite.client
- pyignite.aio_client
- pyignite.constants
- pyignite.cursors
- pyignite.exceptions
-
diff --git a/docs/source/pyignite.datatypes.rst b/docs/source/pyignite.datatypes.rst
index 269d500..70f7714 100644
--- a/docs/source/pyignite.datatypes.rst
+++ b/docs/source/pyignite.datatypes.rst
@@ -31,6 +31,8 @@ Submodules
pyignite.datatypes.cache_config
pyignite.datatypes.cache_properties
pyignite.datatypes.complex
+ pyignite.datatypes.cluster_state
+ pyignite.datatypes.expiry_policy
pyignite.datatypes.internal
pyignite.datatypes.key_value
pyignite.datatypes.null_object
@@ -39,4 +41,3 @@ Submodules
pyignite.datatypes.primitive_objects
pyignite.datatypes.sql
pyignite.datatypes.standard
-
diff --git a/docs/source/pyignite.rst b/docs/source/pyignite.rst
index 85e31a8..c2a36fe 100644
--- a/docs/source/pyignite.rst
+++ b/docs/source/pyignite.rst
@@ -39,7 +39,8 @@ Submodules
pyignite.aio_cache
pyignite.client
pyignite.aio_client
- pyignite.constants
+ pyignite.cluster
+ pyignite.aio_cluster
pyignite.cursors
pyignite.exceptions
diff --git a/examples/expiry_policy.py b/examples/expiry_policy.py
new file mode 100644
index 0000000..2002da1
--- /dev/null
+++ b/examples/expiry_policy.py
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import asyncio
+import time
+
+from pyignite import Client, AioClient
+from pyignite.datatypes import ExpiryPolicy
+from pyignite.datatypes.prop_codes import PROP_NAME, PROP_EXPIRY_POLICY
+from pyignite.exceptions import NotSupportedByClusterError
+
+
+def main():
+ print("Running sync ExpiryPolicy example.")
+
+ client = Client()
+ with client.connect('127.0.0.1', 10800):
+ print("Create cache with expiry policy.")
+ try:
+ ttl_cache = client.create_cache({
+ PROP_NAME: 'test',
+ PROP_EXPIRY_POLICY: ExpiryPolicy(create=1.0)
+ })
+ except NotSupportedByClusterError:
+ print("'ExpiryPolicy' API is not supported by cluster. Finishing...")
+ return
+
+ try:
+ ttl_cache.put(1, 1)
+ time.sleep(0.5)
+ print(f"key = {1}, value = {ttl_cache.get(1)}")
+ # key = 1, value = 1
+ time.sleep(1.2)
+ print(f"key = {1}, value = {ttl_cache.get(1)}")
+ # key = 1, value = None
+ finally:
+ ttl_cache.destroy()
+
+ print("Create simple Cache and set TTL through `with_expire_policy`")
+ simple_cache = client.create_cache('test')
+ try:
+ ttl_cache = simple_cache.with_expire_policy(access=1.0)
+ ttl_cache.put(1, 1)
+ time.sleep(0.5)
+ print(f"key = {1}, value = {ttl_cache.get(1)}")
+ # key = 1, value = 1
+ time.sleep(1.7)
+ print(f"key = {1}, value = {ttl_cache.get(1)}")
+ # key = 1, value = None
+ finally:
+ simple_cache.destroy()
+
+
+async def async_main():
+ print("Running async ExpiryPolicy example.")
+
+ client = AioClient()
+ async with client.connect('127.0.0.1', 10800):
+ print("Create cache with expiry policy.")
+ try:
+ ttl_cache = await client.create_cache({
+ PROP_NAME: 'test',
+ PROP_EXPIRY_POLICY: ExpiryPolicy(create=1.0)
+ })
+ except NotSupportedByClusterError:
+ print("'ExpiryPolicy' API is not supported by cluster. Finishing...")
+ return
+
+ try:
+ await ttl_cache.put(1, 1)
+ await asyncio.sleep(0.5)
+ value = await ttl_cache.get(1)
+ print(f"key = {1}, value = {value}")
+ # key = 1, value = 1
+ await asyncio.sleep(1.2)
+ value = await ttl_cache.get(1)
+ print(f"key = {1}, value = {value}")
+ # key = 1, value = None
+ finally:
+ await ttl_cache.destroy()
+
+ print("Create simple Cache and set TTL through `with_expire_policy`")
+ simple_cache = await client.create_cache('test')
+ try:
+ ttl_cache = simple_cache.with_expire_policy(access=1.0)
+ await ttl_cache.put(1, 1)
+ await asyncio.sleep(0.5)
+ value = await ttl_cache.get(1)
+ print(f"key = {1}, value = {value}")
+ # key = 1, value = 1
+ await asyncio.sleep(1.7)
+ value = await ttl_cache.get(1)
+ print(f"key = {1}, value = {value}")
+ # key = 1, value = None
+ finally:
+ await simple_cache.destroy()
+
+if __name__ == '__main__':
+ main()
+
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(async_main())
diff --git a/examples/failover.py b/examples/failover.py
index 21ab547..3a5ee42 100644
--- a/examples/failover.py
+++ b/examples/failover.py
@@ -26,31 +26,30 @@ nodes = [
]
-def main():
- client = Client(timeout=4.0)
- with client.connect(nodes):
- print('Connected')
+client = Client(timeout=4.0)
+with client.connect(nodes):
+ print('Connected')
- my_cache = client.get_or_create_cache({
- PROP_NAME: 'my_cache',
- PROP_CACHE_MODE: CacheMode.PARTITIONED,
- PROP_BACKUPS_NUMBER: 2,
- })
- my_cache.put('test_key', 0)
- test_value = 0
+ my_cache = client.get_or_create_cache({
+ PROP_NAME: 'my_cache',
+ PROP_CACHE_MODE: CacheMode.PARTITIONED,
+ PROP_BACKUPS_NUMBER: 2,
+ })
+ my_cache.put('test_key', 0)
+ test_value = 0
- # abstract main loop
- while True:
- try:
- # do the work
- test_value = my_cache.get('test_key') or 0
- my_cache.put('test_key', test_value + 1)
- except (OSError, SocketError) as e:
- # recover from error (repeat last command, check data
- # consistency or just continue − depends on the task)
- print(f'Error: {e}')
- print(f'Last value: {test_value}')
- print('Reconnecting')
+ # abstract main loop
+ while True:
+ try:
+ # do the work
+ test_value = my_cache.get('test_key') or 0
+ my_cache.put('test_key', test_value + 1)
+ except (OSError, SocketError) as e:
+ # recover from error (repeat last command, check data
+ # consistency or just continue − depends on the task)
+ print(f'Error: {e}')
+ print(f'Last value: {test_value}')
+ print('Reconnecting')
# Connected
# Error: Connection broken.
diff --git a/pyignite/aio_cache.py b/pyignite/aio_cache.py
index 32f2cb2..f088844 100644
--- a/pyignite/aio_cache.py
+++ b/pyignite/aio_cache.py
@@ -15,6 +15,7 @@
import asyncio
from typing import Any, Iterable, Optional, Union
+from .datatypes import ExpiryPolicy
from .datatypes.internal import AnyDataObject
from .exceptions import CacheCreationError, CacheError, ParameterError
from .utils import status_to_exception
@@ -80,17 +81,17 @@ class AioCache(BaseCache):
:py:meth:`~pyignite.aio_client.AioClient.get_cache` methods instead. See
:ref:`this example <create_cache>` on how to do it.
"""
- def __init__(self, client: 'AioClient', name: str):
+ def __init__(self, client: 'AioClient', name: str, expiry_policy: ExpiryPolicy = None):
"""
Initialize async cache object. For internal use.
:param client: Async Ignite client,
:param name: Cache name.
"""
- super().__init__(client, name)
+ super().__init__(client, name, expiry_policy)
async def _get_best_node(self, key=None, key_hint=None):
- return await self.client.get_best_node(self._cache_id, key, key_hint)
+ return await self.client.get_best_node(self, key, key_hint)
async def settings(self) -> Optional[dict]:
"""
@@ -103,7 +104,7 @@ class AioCache(BaseCache):
"""
if self._settings is None:
conn = await self._get_best_node()
- config_result = await cache_get_configuration_async(conn, self._cache_id)
+ config_result = await cache_get_configuration_async(conn, self.cache_info)
if config_result.status == 0:
self._settings = config_result.value
@@ -118,7 +119,7 @@ class AioCache(BaseCache):
Destroys cache with a given name.
"""
conn = await self._get_best_node()
- return await cache_destroy_async(conn, self._cache_id)
+ return await cache_destroy_async(conn, self.cache_id)
@status_to_exception(CacheError)
async def get(self, key, key_hint: object = None) -> Any:
@@ -134,7 +135,7 @@ class AioCache(BaseCache):
key_hint = AnyDataObject.map_python_type(key)
conn = await self._get_best_node(key, key_hint)
- result = await cache_get_async(conn, self._cache_id, key, key_hint=key_hint)
+ result = await cache_get_async(conn, self.cache_info, key, key_hint=key_hint)
result.value = await self.client.unwrap_binary(result.value)
return result
@@ -155,7 +156,7 @@ class AioCache(BaseCache):
key_hint = AnyDataObject.map_python_type(key)
conn = await self._get_best_node(key, key_hint)
- return await cache_put_async(conn, self._cache_id, key, value, key_hint=key_hint, value_hint=value_hint)
+ return await cache_put_async(conn, self.cache_info, key, value, key_hint=key_hint, value_hint=value_hint)
@status_to_exception(CacheError)
async def get_all(self, keys: list) -> list:
@@ -166,7 +167,7 @@ class AioCache(BaseCache):
:return: a dict of key-value pairs.
"""
conn = await self._get_best_node()
- result = await cache_get_all_async(conn, self._cache_id, keys)
+ result = await cache_get_all_async(conn, self.cache_info, keys)
if result.value:
keys = list(result.value.keys())
values = await asyncio.gather(*[self.client.unwrap_binary(value) for value in result.value.values()])
@@ -186,7 +187,7 @@ class AioCache(BaseCache):
Python type or a tuple of (item, hint),
"""
conn = await self._get_best_node()
- return await cache_put_all_async(conn, self._cache_id, pairs)
+ return await cache_put_all_async(conn, self.cache_info, pairs)
@status_to_exception(CacheError)
async def replace(self, key, value, key_hint: object = None, value_hint: object = None):
@@ -204,7 +205,7 @@ class AioCache(BaseCache):
key_hint = AnyDataObject.map_python_type(key)
conn = await self._get_best_node(key, key_hint)
- result = await cache_replace_async(conn, self._cache_id, key, value, key_hint=key_hint, value_hint=value_hint)
+ result = await cache_replace_async(conn, self.cache_info, key, value, key_hint=key_hint, value_hint=value_hint)
result.value = await self.client.unwrap_binary(result.value)
return result
@@ -218,9 +219,9 @@ class AioCache(BaseCache):
"""
conn = await self._get_best_node()
if keys:
- return await cache_clear_keys_async(conn, self._cache_id, keys)
+ return await cache_clear_keys_async(conn, self.cache_info, keys)
else:
- return await cache_clear_async(conn, self._cache_id)
+ return await cache_clear_async(conn, self.cache_info)
@status_to_exception(CacheError)
async def clear_key(self, key, key_hint: object = None):
@@ -235,7 +236,7 @@ class AioCache(BaseCache):
key_hint = AnyDataObject.map_python_type(key)
conn = await self._get_best_node(key, key_hint)
- return await cache_clear_key_async(conn, self._cache_id, key, key_hint=key_hint)
+ return await cache_clear_key_async(conn, self.cache_info, key, key_hint=key_hint)
@status_to_exception(CacheError)
async def clear_keys(self, keys: Iterable):
@@ -245,7 +246,7 @@ class AioCache(BaseCache):
:param keys: a list of keys or (key, type hint) tuples
"""
conn = await self._get_best_node()
- return await cache_clear_keys_async(conn, self._cache_id, keys)
+ return await cache_clear_keys_async(conn, self.cache_info, keys)
@status_to_exception(CacheError)
async def contains_key(self, key, key_hint=None) -> bool:
@@ -261,7 +262,7 @@ class AioCache(BaseCache):
key_hint = AnyDataObject.map_python_type(key)
conn = await self._get_best_node(key, key_hint)
- return await cache_contains_key_async(conn, self._cache_id, key, key_hint=key_hint)
+ return await cache_contains_key_async(conn, self.cache_info, key, key_hint=key_hint)
@status_to_exception(CacheError)
async def contains_keys(self, keys: Iterable) -> bool:
@@ -272,7 +273,7 @@ class AioCache(BaseCache):
:return: boolean `True` when all keys are present, `False` otherwise.
"""
conn = await self._get_best_node()
- return await cache_contains_keys_async(conn, self._cache_id, keys)
+ return await cache_contains_keys_async(conn, self.cache_info, keys)
@status_to_exception(CacheError)
async def get_and_put(self, key, value, key_hint=None, value_hint=None) -> Any:
@@ -292,7 +293,7 @@ class AioCache(BaseCache):
key_hint = AnyDataObject.map_python_type(key)
conn = await self._get_best_node(key, key_hint)
- result = await cache_get_and_put_async(conn, self._cache_id, key, value, key_hint, value_hint)
+ result = await cache_get_and_put_async(conn, self.cache_info, key, value, key_hint, value_hint)
result.value = await self.client.unwrap_binary(result.value)
return result
@@ -315,7 +316,7 @@ class AioCache(BaseCache):
key_hint = AnyDataObject.map_python_type(key)
conn = await self._get_best_node(key, key_hint)
- result = await cache_get_and_put_if_absent_async(conn, self._cache_id, key, value, key_hint, value_hint)
+ result = await cache_get_and_put_if_absent_async(conn, self.cache_info, key, value, key_hint, value_hint)
result.value = await self.client.unwrap_binary(result.value)
return result
@@ -336,7 +337,7 @@ class AioCache(BaseCache):
key_hint = AnyDataObject.map_python_type(key)
conn = await self._get_best_node(key, key_hint)
- return await cache_put_if_absent_async(conn, self._cache_id, key, value, key_hint, value_hint)
+ return await cache_put_if_absent_async(conn, self.cache_info, key, value, key_hint, value_hint)
@status_to_exception(CacheError)
async def get_and_remove(self, key, key_hint=None) -> Any:
@@ -352,7 +353,7 @@ class AioCache(BaseCache):
key_hint = AnyDataObject.map_python_type(key)
conn = await self._get_best_node(key, key_hint)
- result = await cache_get_and_remove_async(conn, self._cache_id, key, key_hint)
+ result = await cache_get_and_remove_async(conn, self.cache_info, key, key_hint)
result.value = await self.client.unwrap_binary(result.value)
return result
@@ -375,7 +376,7 @@ class AioCache(BaseCache):
key_hint = AnyDataObject.map_python_type(key)
conn = await self._get_best_node(key, key_hint)
- result = await cache_get_and_replace_async(conn, self._cache_id, key, value, key_hint, value_hint)
+ result = await cache_get_and_replace_async(conn, self.cache_info, key, value, key_hint, value_hint)
result.value = await self.client.unwrap_binary(result.value)
return result
@@ -392,7 +393,7 @@ class AioCache(BaseCache):
key_hint = AnyDataObject.map_python_type(key)
conn = await self._get_best_node(key, key_hint)
- return await cache_remove_key_async(conn, self._cache_id, key, key_hint)
+ return await cache_remove_key_async(conn, self.cache_info, key, key_hint)
@status_to_exception(CacheError)
async def remove_keys(self, keys: list):
@@ -403,7 +404,7 @@ class AioCache(BaseCache):
:param keys: list of keys or tuples of (key, key_hint) to remove.
"""
conn = await self._get_best_node()
- return await cache_remove_keys_async(conn, self._cache_id, keys)
+ return await cache_remove_keys_async(conn, self.cache_info, keys)
@status_to_exception(CacheError)
async def remove_all(self):
@@ -411,7 +412,7 @@ class AioCache(BaseCache):
Removes all cache entries, notifying listeners and cache writers.
"""
conn = await self._get_best_node()
- return await cache_remove_all_async(conn, self._cache_id)
+ return await cache_remove_all_async(conn, self.cache_info)
@status_to_exception(CacheError)
async def remove_if_equals(self, key, sample, key_hint=None, sample_hint=None):
@@ -430,7 +431,7 @@ class AioCache(BaseCache):
key_hint = AnyDataObject.map_python_type(key)
conn = await self._get_best_node(key, key_hint)
- return await cache_remove_if_equals_async(conn, self._cache_id, key, sample, key_hint, sample_hint)
+ return await cache_remove_if_equals_async(conn, self.cache_info, key, sample, key_hint, sample_hint)
@status_to_exception(CacheError)
async def replace_if_equals(self, key, sample, value, key_hint=None, sample_hint=None, value_hint=None) -> Any:
@@ -453,7 +454,7 @@ class AioCache(BaseCache):
key_hint = AnyDataObject.map_python_type(key)
conn = await self._get_best_node(key, key_hint)
- result = await cache_replace_if_equals_async(conn, self._cache_id, key, sample, value, key_hint, sample_hint,
+ result = await cache_replace_if_equals_async(conn, self.cache_info, key, sample, value, key_hint, sample_hint,
value_hint)
result.value = await self.client.unwrap_binary(result.value)
return result
@@ -469,7 +470,7 @@ class AioCache(BaseCache):
:return: integer number of cache entries.
"""
conn = await self._get_best_node()
- return await cache_get_size_async(conn, self._cache_id, peek_modes)
+ return await cache_get_size_async(conn, self.cache_info, peek_modes)
def scan(self, page_size: int = 1, partitions: int = -1, local: bool = False) -> AioScanCursor:
"""
@@ -484,4 +485,4 @@ class AioCache(BaseCache):
on local node only. Defaults to False,
:return: async scan query cursor
"""
- return AioScanCursor(self.client, self._cache_id, page_size, partitions, local)
+ return AioScanCursor(self.client, self.cache_info, page_size, partitions, local)
diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py
index 7a5959d..b0498f7 100644
--- a/pyignite/aio_client.py
+++ b/pyignite/aio_client.py
@@ -29,6 +29,7 @@ from .connection import AioConnection
from .constants import AFFINITY_RETRIES, AFFINITY_DELAY
from .datatypes import BinaryObject
from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors
+from .queries.query import CacheInfo
from .stream import AioBinaryStream, READ_BACKWARD
from .utils import cache_id, entity_id, status_to_exception, is_wrapped
@@ -452,20 +453,24 @@ class AioClient(BaseClient):
:return: async sql fields cursor with result rows as a lists. If
`include_field_names` was set, the first row will hold field names.
"""
+ if isinstance(cache, (int, str)):
+ c_info = CacheInfo(cache_id=cache_id(cache), protocol_context=self.protocol_context)
+ elif isinstance(cache, AioCache):
+ c_info = cache.cache_info
+ else:
+ c_info = None
- c_id = cache.cache_id if isinstance(cache, AioCache) else cache_id(cache)
-
- if c_id != 0:
+ if c_info:
schema = None
- return AioSqlFieldsCursor(self, c_id, query_str, page_size, query_args, schema, statement_type,
+ return AioSqlFieldsCursor(self, c_info, query_str, page_size, query_args, schema, statement_type,
distributed_joins, local, replicated_only, enforce_join_order, collocated,
lazy, include_field_names, max_rows, timeout)
def get_cluster(self) -> 'AioCluster':
"""
- Gets client cluster facade.
+ Get client cluster facade.
- :return: AioClient cluster facade.
+ :return: :py:class:`~pyignite.aio_cluster.AioCluster` instance.
"""
return AioCluster(self)
diff --git a/pyignite/aio_cluster.py b/pyignite/aio_cluster.py
index 6d76125..afbc41b 100644
--- a/pyignite/aio_cluster.py
+++ b/pyignite/aio_cluster.py
@@ -18,6 +18,7 @@ This module contains `AioCluster` that lets you get info and change state of the
whole cluster asynchronously.
"""
from pyignite.api.cluster import cluster_get_state_async, cluster_set_state_async
+from pyignite.datatypes import ClusterState
from pyignite.exceptions import ClusterError
from pyignite.utils import status_to_exception
@@ -30,27 +31,34 @@ class AioCluster:
"""
def __init__(self, client: 'AioClient'):
+ """
+ :param client: :py:class:`~pyignite.aio_client.AioClient` instance.
+ """
self._client = client
@status_to_exception(ClusterError)
- async def get_state(self):
+ async def get_state(self) -> 'ClusterState':
"""
Gets current cluster state.
- :return: Current cluster state. This is one of ClusterState.INACTIVE,
- ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
+ :return: Current cluster state. This is one of
+ :py:attr:`~pyignite.datatypes.cluster_state.ClusterState.INACTIVE`,
+ :py:attr:`~pyignite.datatypes.cluster_state.ClusterState.ACTIVE`,
+ :py:attr:`~pyignite.datatypes.cluster_state.ClusterState.ACTIVE_READ_ONLY`.
"""
return await cluster_get_state_async(await self._client.random_node())
@status_to_exception(ClusterError)
- async def set_state(self, state):
+ async def set_state(self, state: 'ClusterState'):
"""
Changes current cluster state to the given.
Note: Deactivation clears in-memory caches (without persistence)
including the system caches.
- :param state: New cluster state. This is one of ClusterState.INACTIVE,
- ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
+ :param state: New cluster state. This is one of
+ :py:attr:`~pyignite.datatypes.cluster_state.ClusterState.INACTIVE`,
+ :py:attr:`~pyignite.datatypes.cluster_state.ClusterState.ACTIVE`,
+ :py:attr:`~pyignite.datatypes.cluster_state.ClusterState.ACTIVE_READ_ONLY`.
"""
return await cluster_set_state_async(await self._client.random_node(), state)
diff --git a/pyignite/api/cache_config.py b/pyignite/api/cache_config.py
index 0adb549..7f2869b 100644
--- a/pyignite/api/cache_config.py
+++ b/pyignite/api/cache_config.py
@@ -26,9 +26,9 @@ only non-persistent storage tier.)
from typing import Union
from pyignite.connection import Connection, AioConnection
-from pyignite.datatypes.cache_config import cache_config_struct
+from pyignite.datatypes.cache_config import get_cache_config_struct
from pyignite.datatypes.cache_properties import prop_map
-from pyignite.datatypes import Int, Byte, prop_codes, Short, String, StringArray
+from pyignite.datatypes import Int, prop_codes, Short, String, StringArray
from pyignite.queries import Query, ConfigQuery, query_perform
from pyignite.queries.op_codes import (
OP_CACHE_GET_CONFIGURATION, OP_CACHE_CREATE_WITH_NAME, OP_CACHE_GET_OR_CREATE_WITH_NAME, OP_CACHE_DESTROY,
@@ -37,6 +37,9 @@ from pyignite.queries.op_codes import (
from pyignite.utils import cache_id
from .result import APIResult
+from ..datatypes.prop_codes import PROP_EXPIRY_POLICY
+from ..exceptions import NotSupportedByClusterError
+from ..queries.query import CacheInfo
def compact_cache_config(cache_config: dict) -> dict:
@@ -57,29 +60,27 @@ def compact_cache_config(cache_config: dict) -> dict:
return result
-def cache_get_configuration(connection: 'Connection', cache: Union[str, int],
- flags: int = 0, query_id=None) -> 'APIResult':
+def cache_get_configuration(connection: 'Connection', cache_info: CacheInfo, query_id=None) -> 'APIResult':
"""
Gets configuration for the given cache.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
- :param flags: Ignite documentation is unclear on this subject,
+ :param cache_info: cache meta info,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Result value is OrderedDict with
the cache configuration parameters.
"""
- return __cache_get_configuration(connection, cache, flags, query_id)
+ return __cache_get_configuration(connection, cache_info, query_id)
-async def cache_get_configuration_async(connection: 'AioConnection', cache: Union[str, int],
- flags: int = 0, query_id=None) -> 'APIResult':
+async def cache_get_configuration_async(
+ connection: 'AioConnection', cache_info: CacheInfo, query_id=None) -> 'APIResult':
"""
Async version of cache_get_configuration.
"""
- return await __cache_get_configuration(connection, cache, flags, query_id)
+ return await __cache_get_configuration(connection, cache_info, query_id)
def __post_process_cache_config(result):
@@ -88,22 +89,20 @@ def __post_process_cache_config(result):
return result
-def __cache_get_configuration(connection, cache, flags, query_id):
+def __cache_get_configuration(connection, cache_info, query_id):
query_struct = Query(
OP_CACHE_GET_CONFIGURATION,
[
- ('hash_code', Int),
- ('flags', Byte),
+ ('cache_info', CacheInfo)
],
query_id=query_id,
)
return query_perform(query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flags': flags
+ 'cache_info': cache_info
},
response_config=[
- ('cache_config', cache_config_struct)
+ ('cache_config', get_cache_config_struct(connection.protocol_context))
],
post_process_fun=__post_process_cache_config
)
@@ -184,9 +183,9 @@ async def cache_destroy_async(connection: 'AioConnection', cache: Union[str, int
def __cache_destroy(connection, cache, query_id):
- query_struct = Query(OP_CACHE_DESTROY, [('hash_code', Int)], query_id=query_id)
+ query_struct = Query(OP_CACHE_DESTROY, [('cache_id', Int)], query_id=query_id)
- return query_perform(query_struct, connection, query_params={'hash_code': cache_id(cache)})
+ return query_perform(query_struct, connection, query_params={'cache_id': cache_id(cache)})
def cache_get_names(connection: 'Connection', query_id=None) -> 'APIResult':
@@ -278,8 +277,12 @@ async def cache_get_or_create_with_config_async(connection: 'AioConnection', cac
def __cache_create_with_config(op_code, connection, cache_props, query_id):
prop_types, prop_values = {}, {}
+ is_expiry_policy_supported = connection.protocol_context.is_expiry_policy_supported()
for i, prop_item in enumerate(cache_props.items()):
prop_code, prop_value = prop_item
+ if prop_code == PROP_EXPIRY_POLICY and not is_expiry_policy_supported:
+ raise NotSupportedByClusterError("'ExpiryPolicy' API is not supported by the cluster")
+
prop_name = 'property_{}'.format(i)
prop_types[prop_name] = prop_map(prop_code)
prop_values[prop_name] = prop_value
diff --git a/pyignite/api/key_value.py b/pyignite/api/key_value.py
index 9fb13bb..5038051 100644
--- a/pyignite/api/key_value.py
+++ b/pyignite/api/key_value.py
@@ -23,54 +23,51 @@ from pyignite.queries.op_codes import (
OP_CACHE_CLEAR_KEYS, OP_CACHE_REMOVE_KEY, OP_CACHE_REMOVE_IF_EQUALS, OP_CACHE_REMOVE_KEYS, OP_CACHE_REMOVE_ALL,
OP_CACHE_GET_SIZE, OP_CACHE_LOCAL_PEEK
)
-from pyignite.datatypes import Map, Bool, Byte, Int, Long, AnyDataArray, AnyDataObject, ByteArray
+from pyignite.datatypes import Map, Bool, Long, AnyDataArray, AnyDataObject, ByteArray
from pyignite.datatypes.base import IgniteDataType
from pyignite.queries import Query, query_perform
-from pyignite.utils import cache_id
from .result import APIResult
+from ..queries.query import CacheInfo
-def cache_put(connection: 'Connection', cache: Union[str, int], key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None, binary: bool = False,
+def cache_put(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
query_id: Optional[int] = None) -> 'APIResult':
"""
Puts a value with a given key to cache (overwriting existing value if any).
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param key: key for the cache entry. Can be of any supported type,
:param value: value for the key,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given value
should be converted.
- :param binary: (optional) pass True to keep the value in binary form.
- False by default,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status if a value
is written, non-zero status and an error description otherwise.
"""
- return __cache_put(connection, cache, key, value, key_hint, value_hint, binary, query_id)
+ return __cache_put(connection, cache_info, key, value, key_hint, value_hint, query_id)
-async def cache_put_async(connection: 'AioConnection', cache: Union[str, int], key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None, binary: bool = False,
+async def cache_put_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_put
"""
- return await __cache_put(connection, cache, key, value, key_hint, value_hint, binary, query_id)
+ return await __cache_put(connection, cache_info, key, value, key_hint, value_hint, query_id)
-def __cache_put(connection, cache, key, value, key_hint, value_hint, binary, query_id):
+def __cache_put(connection, cache_info, key, value, key_hint, value_hint, query_id):
query_struct = Query(
OP_CACHE_PUT,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
('value', value_hint or AnyDataObject),
],
@@ -79,50 +76,45 @@ def __cache_put(connection, cache, key, value, key_hint, value_hint, binary, que
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'key': key,
'value': value
}
)
-def cache_get(connection: 'Connection', cache: Union[str, int], key: Any, key_hint: 'IgniteDataType' = None,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+def cache_get(connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
+ query_id: Optional[int] = None) -> 'APIResult':
"""
Retrieves a value from cache by key.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param key: key for the cache entry. Can be of any supported type,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
- :param binary: (optional) pass True to keep the value in binary form.
- False by default,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status and a value
retrieved on success, non-zero status and an error description on failure.
"""
- return __cache_get(connection, cache, key, key_hint, binary, query_id)
+ return __cache_get(connection, cache_info, key, key_hint, query_id)
-async def cache_get_async(connection: 'AioConnection', cache: Union[str, int], key: Any,
- key_hint: 'IgniteDataType' = None, binary: bool = False,
- query_id: Optional[int] = None) -> 'APIResult':
+async def cache_get_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any,
+ key_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_get
"""
- return await __cache_get(connection, cache, key, key_hint, binary, query_id)
+ return await __cache_get(connection, cache_info, key, key_hint, query_id)
-def __cache_get(connection, cache, key, key_hint, binary, query_id):
+def __cache_get(connection, cache_info, key, key_hint, query_id):
query_struct = Query(
OP_CACHE_GET,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
],
query_id=query_id,
@@ -130,8 +122,7 @@ def __cache_get(connection, cache, key, key_hint, binary, query_id):
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'key': key,
},
response_config=[
@@ -141,16 +132,14 @@ def __cache_get(connection, cache, key, key_hint, binary, query_id):
)
-def cache_get_all(connection: 'Connection', cache: Union[str, int], keys: Iterable, binary: bool = False,
+def cache_get_all(connection: 'Connection', cache_info: CacheInfo, keys: Iterable,
query_id: Optional[int] = None) -> 'APIResult':
"""
Retrieves multiple key-value pairs from cache.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param keys: list of keys or tuples of (key, key_hint),
- :param binary: (optional) pass True to keep the value in binary form.
- False by default,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
@@ -158,23 +147,22 @@ def cache_get_all(connection: 'Connection', cache: Union[str, int], keys: Iterab
retrieved key-value pairs, non-zero status and an error description
on failure.
"""
- return __cache_get_all(connection, cache, keys, binary, query_id)
+ return __cache_get_all(connection, cache_info, keys, query_id)
-async def cache_get_all_async(connection: 'AioConnection', cache: Union[str, int], keys: Iterable, binary: bool = False,
+async def cache_get_all_async(connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable,
query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_get_all.
"""
- return await __cache_get_all(connection, cache, keys, binary, query_id)
+ return await __cache_get_all(connection, cache_info, keys, query_id)
-def __cache_get_all(connection, cache, keys, binary, query_id):
+def __cache_get_all(connection, cache_info, keys, query_id):
query_struct = Query(
OP_CACHE_GET_ALL,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('keys', AnyDataArray()),
],
query_id=query_id,
@@ -182,8 +170,7 @@ def __cache_get_all(connection, cache, keys, binary, query_id):
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'keys': keys,
},
response_config=[
@@ -193,42 +180,39 @@ def __cache_get_all(connection, cache, keys, binary, query_id):
)
-def cache_put_all(connection: 'Connection', cache: Union[str, int], pairs: dict, binary: bool = False,
+def cache_put_all(connection: 'Connection', cache_info: CacheInfo, pairs: dict,
query_id: Optional[int] = None) -> 'APIResult':
"""
Puts multiple key-value pairs to cache (overwriting existing associations
if any).
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param pairs: dictionary type parameters, contains key-value pairs to save.
Each key or value can be an item of representable Python type or a tuple
of (item, hint),
- :param binary: (optional) pass True to keep the value in binary form.
- False by default,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status if key-value pairs
are written, non-zero status and an error description otherwise.
"""
- return __cache_put_all(connection, cache, pairs, binary, query_id)
+ return __cache_put_all(connection, cache_info, pairs, query_id)
-async def cache_put_all_async(connection: 'AioConnection', cache: Union[str, int], pairs: dict, binary: bool = False,
+async def cache_put_all_async(connection: 'AioConnection', cache_info: CacheInfo, pairs: dict,
query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_put_all.
"""
- return await __cache_put_all(connection, cache, pairs, binary, query_id)
+ return await __cache_put_all(connection, cache_info, pairs, query_id)
-def __cache_put_all(connection, cache, pairs, binary, query_id):
+def __cache_put_all(connection, cache_info, pairs, query_id):
query_struct = Query(
OP_CACHE_PUT_ALL,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('data', Map),
],
query_id=query_id,
@@ -236,25 +220,22 @@ def __cache_put_all(connection, cache, pairs, binary, query_id):
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'data': pairs,
},
)
-def cache_contains_key(connection: 'Connection', cache: Union[str, int], key: Any, key_hint: 'IgniteDataType' = None,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+def cache_contains_key(connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
+ query_id: Optional[int] = None) -> 'APIResult':
"""
Returns a value indicating whether given key is present in cache.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param key: key for the cache entry. Can be of any supported type,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
- :param binary: pass True to keep the value in binary form. False
- by default,
:param query_id: a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
@@ -262,24 +243,22 @@ def cache_contains_key(connection: 'Connection', cache: Union[str, int], key: An
retrieved on success: `True` when key is present, `False` otherwise,
non-zero status and an error description on failure.
"""
- return __cache_contains_key(connection, cache, key, key_hint, binary, query_id)
+ return __cache_contains_key(connection, cache_info, key, key_hint, query_id)
-async def cache_contains_key_async(connection: 'AioConnection', cache: Union[str, int], key: Any,
- key_hint: 'IgniteDataType' = None, binary: bool = False,
- query_id: Optional[int] = None) -> 'APIResult':
+async def cache_contains_key_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any,
+ key_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_contains_key.
"""
- return await __cache_contains_key(connection, cache, key, key_hint, binary, query_id)
+ return await __cache_contains_key(connection, cache_info, key, key_hint, query_id)
-def __cache_contains_key(connection, cache, key, key_hint, binary, query_id):
+def __cache_contains_key(connection, cache_info, key, key_hint, query_id):
query_struct = Query(
OP_CACHE_CONTAINS_KEY,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
],
query_id=query_id,
@@ -287,8 +266,7 @@ def __cache_contains_key(connection, cache, key, key_hint, binary, query_id):
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'key': key,
},
response_config=[
@@ -298,16 +276,14 @@ def __cache_contains_key(connection, cache, key, key_hint, binary, query_id):
)
-def cache_contains_keys(connection: 'Connection', cache: Union[str, int], keys: Iterable, binary: bool = False,
+def cache_contains_keys(connection: 'Connection', cache_info: CacheInfo, keys: Iterable,
query_id: Optional[int] = None) -> 'APIResult':
"""
Returns a value indicating whether all given keys are present in cache.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param keys: a list of keys or (key, type hint) tuples,
- :param binary: pass True to keep the value in binary form. False
- by default,
:param query_id: a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
@@ -315,23 +291,22 @@ def cache_contains_keys(connection: 'Connection', cache: Union[str, int], keys:
retrieved on success: `True` when all keys are present, `False` otherwise,
non-zero status and an error description on failure.
"""
- return __cache_contains_keys(connection, cache, keys, binary, query_id)
+ return __cache_contains_keys(connection, cache_info, keys, query_id)
-async def cache_contains_keys_async(connection: 'AioConnection', cache: Union[str, int], keys: Iterable,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+async def cache_contains_keys_async(connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable,
+ query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_contains_keys.
"""
- return await __cache_contains_keys(connection, cache, keys, binary, query_id)
+ return await __cache_contains_keys(connection, cache_info, keys, query_id)
-def __cache_contains_keys(connection, cache, keys, binary, query_id):
+def __cache_contains_keys(connection, cache_info, keys, query_id):
query_struct = Query(
OP_CACHE_CONTAINS_KEYS,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('keys', AnyDataArray()),
],
query_id=query_id,
@@ -339,8 +314,7 @@ def __cache_contains_keys(connection, cache, keys, binary, query_id):
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'keys': keys,
},
response_config=[
@@ -350,23 +324,21 @@ def __cache_contains_keys(connection, cache, keys, binary, query_id):
)
-def cache_get_and_put(connection: 'Connection', cache: Union[str, int], key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None, binary: bool = False,
+def cache_get_and_put(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
query_id: Optional[int] = None) -> 'APIResult':
"""
- Puts a value with a given key to cache, and returns the previous value
+ Puts a value with a given key to cache_info, and returns the previous value
for that key, or null value if there was not such key.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param key: key for the cache entry. Can be of any supported type,
:param value: value for the key,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given value
should be converted.
- :param binary: pass True to keep the value in binary form. False
- by default,
:param query_id: a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
@@ -374,24 +346,23 @@ def cache_get_and_put(connection: 'Connection', cache: Union[str, int], key: Any
or None if a value is written, non-zero status and an error description
in case of error.
"""
- return __cache_get_and_put(connection, cache, key, value, key_hint, value_hint, binary, query_id)
+ return __cache_get_and_put(connection, cache_info, key, value, key_hint, value_hint, query_id)
-async def cache_get_and_put_async(connection: 'AioConnection', cache: Union[str, int], key: Any, value: Any,
+async def cache_get_and_put_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+ query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_get_and_put.
"""
- return await __cache_get_and_put(connection, cache, key, value, key_hint, value_hint, binary, query_id)
+ return await __cache_get_and_put(connection, cache_info, key, value, key_hint, value_hint, query_id)
-def __cache_get_and_put(connection, cache, key, value, key_hint, value_hint, binary, query_id):
+def __cache_get_and_put(connection, cache_info, key, value, key_hint, value_hint, query_id):
query_struct = Query(
OP_CACHE_GET_AND_PUT,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
('value', value_hint or AnyDataObject),
],
@@ -400,8 +371,7 @@ def __cache_get_and_put(connection, cache, key, value, key_hint, value_hint, bin
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'key': key,
'value': value,
},
@@ -412,8 +382,8 @@ def __cache_get_and_put(connection, cache, key, value, key_hint, value_hint, bin
)
-def cache_get_and_replace(connection: 'Connection', cache: Union[str, int], key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None, binary: bool = False,
+def cache_get_and_replace(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
query_id: Optional[int] = None) -> 'APIResult':
"""
Puts a value with a given key to cache, returning previous value
@@ -421,38 +391,35 @@ def cache_get_and_replace(connection: 'Connection', cache: Union[str, int], key:
for that key.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param key: key for the cache entry. Can be of any supported type,
:param value: value for the key,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given value
should be converted.
- :param binary: pass True to keep the value in binary form. False
- by default,
:param query_id: a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status and an old value
or None on success, non-zero status and an error description otherwise.
"""
- return __cache_get_and_replace(connection, cache, key, key_hint, value, value_hint, binary, query_id)
+ return __cache_get_and_replace(connection, cache_info, key, key_hint, value, value_hint, query_id)
-async def cache_get_and_replace_async(connection: 'AioConnection', cache: Union[str, int], key: Any, value: Any,
+async def cache_get_and_replace_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+ query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_get_and_replace.
"""
- return await __cache_get_and_replace(connection, cache, key, key_hint, value, value_hint, binary, query_id)
+ return await __cache_get_and_replace(connection, cache_info, key, key_hint, value, value_hint, query_id)
-def __cache_get_and_replace(connection, cache, key, key_hint, value, value_hint, binary, query_id):
+def __cache_get_and_replace(connection, cache_info, key, key_hint, value, value_hint, query_id):
query_struct = Query(
OP_CACHE_GET_AND_REPLACE, [
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
('value', value_hint or AnyDataObject),
],
@@ -461,8 +428,7 @@ def __cache_get_and_replace(connection, cache, key, key_hint, value, value_hint,
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'key': key,
'value': value,
},
@@ -473,38 +439,35 @@ def __cache_get_and_replace(connection, cache, key, key_hint, value, value_hint,
)
-def cache_get_and_remove(connection: 'Connection', cache: Union[str, int], key: Any, key_hint: 'IgniteDataType' = None,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+def cache_get_and_remove(connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
+ query_id: Optional[int] = None) -> 'APIResult':
"""
Removes the cache entry with specified key, returning the value.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param key: key for the cache entry. Can be of any supported type,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
- :param binary: pass True to keep the value in binary form. False
- by default,
:param query_id: a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status and an old value
or None, non-zero status and an error description otherwise.
"""
- return __cache_get_and_remove(connection, cache, key, key_hint, binary, query_id)
+ return __cache_get_and_remove(connection, cache_info, key, key_hint, query_id)
-async def cache_get_and_remove_async(connection: 'AioConnection', cache: Union[str, int], key: Any,
- key_hint: 'IgniteDataType' = None, binary: bool = False,
- query_id: Optional[int] = None) -> 'APIResult':
- return await __cache_get_and_remove(connection, cache, key, key_hint, binary, query_id)
+async def cache_get_and_remove_async(
+ connection: 'AioConnection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
+ query_id: Optional[int] = None) -> 'APIResult':
+ return await __cache_get_and_remove(connection, cache_info, key, key_hint, query_id)
-def __cache_get_and_remove(connection, cache, key, key_hint, binary, query_id):
+def __cache_get_and_remove(connection, cache_info, key, key_hint, query_id):
query_struct = Query(
OP_CACHE_GET_AND_REMOVE, [
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
],
query_id=query_id,
@@ -512,8 +475,7 @@ def __cache_get_and_remove(connection, cache, key, key_hint, binary, query_id):
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'key': key,
},
response_config=[
@@ -523,47 +485,44 @@ def __cache_get_and_remove(connection, cache, key, key_hint, binary, query_id):
)
-def cache_put_if_absent(connection: 'Connection', cache: Union[str, int], key: Any, value: Any,
+def cache_put_if_absent(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+ query_id: Optional[int] = None) -> 'APIResult':
"""
Puts a value with a given key to cache only if the key
does not already exist.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param key: key for the cache entry. Can be of any supported type,
:param value: value for the key,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given value
should be converted.
- :param binary: (optional) pass True to keep the value in binary form. False
- by default,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status on success,
non-zero status and an error description otherwise.
"""
- return __cache_put_if_absent(connection, cache, key, value, key_hint, value_hint, binary, query_id)
+ return __cache_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id)
-async def cache_put_if_absent_async(connection: 'AioConnection', cache: Union[str, int], key: Any, value: Any,
+async def cache_put_if_absent_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+ query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_put_if_absent.
"""
- return await __cache_put_if_absent(connection, cache, key, value, key_hint, value_hint, binary, query_id)
+ return await __cache_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id)
-def __cache_put_if_absent(connection, cache, key, value, key_hint, value_hint, binary, query_id):
+def __cache_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id):
query_struct = Query(
OP_CACHE_PUT_IF_ABSENT,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
('value', value_hint or AnyDataObject),
],
@@ -572,8 +531,7 @@ def __cache_put_if_absent(connection, cache, key, value, key_hint, value_hint, b
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'key': key,
'value': value,
},
@@ -584,47 +542,44 @@ def __cache_put_if_absent(connection, cache, key, value, key_hint, value_hint, b
)
-def cache_get_and_put_if_absent(connection: 'Connection', cache: Union[str, int], key: Any, value: Any,
+def cache_get_and_put_if_absent(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+ query_id: Optional[int] = None) -> 'APIResult':
"""
Puts a value with a given key to cache only if the key does not
already exist.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param key: key for the cache entry. Can be of any supported type,
:param value: value for the key,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given value
should be converted.
- :param binary: (optional) pass True to keep the value in binary form. False
- by default,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status and an old value
or None on success, non-zero status and an error description otherwise.
"""
- return __cache_get_and_put_if_absent(connection, cache, key, value, key_hint, value_hint, binary, query_id)
+ return __cache_get_and_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id)
-async def cache_get_and_put_if_absent_async(connection: 'AioConnection', cache: Union[str, int], key: Any, value: Any,
+async def cache_get_and_put_if_absent_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+ query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_get_and_put_if_absent.
"""
- return await __cache_get_and_put_if_absent(connection, cache, key, value, key_hint, value_hint, binary, query_id)
+ return await __cache_get_and_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id)
-def __cache_get_and_put_if_absent(connection, cache, key, value, key_hint, value_hint, binary, query_id):
+def __cache_get_and_put_if_absent(connection, cache_info, key, value, key_hint, value_hint, query_id):
query_struct = Query(
OP_CACHE_GET_AND_PUT_IF_ABSENT,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
('value', value_hint or AnyDataObject),
],
@@ -633,8 +588,7 @@ def __cache_get_and_put_if_absent(connection, cache, key, value, key_hint, value
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'key': key,
'value': value,
},
@@ -645,22 +599,20 @@ def __cache_get_and_put_if_absent(connection, cache, key, value, key_hint, value
)
-def cache_replace(connection: 'Connection', cache: Union[str, int], key: Any, value: Any,
- key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None, binary: bool = False,
+def cache_replace(connection: 'Connection', cache_info: CacheInfo, key: Any, value: Any,
+ key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
query_id: Optional[int] = None) -> 'APIResult':
"""
Puts a value with a given key to cache only if the key already exist.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param key: key for the cache entry. Can be of any supported type,
:param value: value for the key,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given value
should be converted.
- :param binary: pass True to keep the value in binary form. False
- by default,
:param query_id: a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
@@ -668,24 +620,23 @@ def cache_replace(connection: 'Connection', cache: Union[str, int], key: Any, va
success code, or non-zero status and an error description if something
has gone wrong.
"""
- return __cache_replace(connection, cache, key, value, key_hint, value_hint, binary, query_id)
+ return __cache_replace(connection, cache_info, key, value, key_hint, value_hint, query_id)
-async def cache_replace_async(connection: 'AioConnection', cache: Union[str, int], key: Any, value: Any,
+async def cache_replace_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any, value: Any,
key_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+ query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_replace.
"""
- return await __cache_replace(connection, cache, key, value, key_hint, value_hint, binary, query_id)
+ return await __cache_replace(connection, cache_info, key, value, key_hint, value_hint, query_id)
-def __cache_replace(connection, cache, key, value, key_hint, value_hint, binary, query_id):
+def __cache_replace(connection, cache_info, key, value, key_hint, value_hint, query_id):
query_struct = Query(
OP_CACHE_REPLACE,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
('value', value_hint or AnyDataObject),
],
@@ -694,8 +645,7 @@ def __cache_replace(connection, cache, key, value, key_hint, value_hint, binary,
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'key': key,
'value': value,
},
@@ -706,16 +656,15 @@ def __cache_replace(connection, cache, key, value, key_hint, value_hint, binary,
)
-def cache_replace_if_equals(connection: 'Connection', cache: Union[str, int], key: Any, sample: Any, value: Any,
+def cache_replace_if_equals(connection: 'Connection', cache_info: CacheInfo, key: Any, sample: Any, value: Any,
key_hint: 'IgniteDataType' = None, sample_hint: 'IgniteDataType' = None,
- value_hint: 'IgniteDataType' = None, binary: bool = False,
- query_id: Optional[int] = None) -> 'APIResult':
+ value_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
"""
Puts a value with a given key to cache only if the key already exists
and value equals provided sample.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param key: key for the cache entry,
:param sample: a sample to compare the stored value with,
:param value: new value for the given key,
@@ -725,8 +674,6 @@ def cache_replace_if_equals(connection: 'Connection', cache: Union[str, int], ke
the given sample should be converted
:param value_hint: (optional) Ignite data type, for which the given value
should be converted,
- :param binary: (optional) pass True to keep the value in binary form.
- False by default,
:param query_id: (optional) a value generated by client and returned
as-is in response.query_id. When the parameter is omitted, a random
value is generated,
@@ -734,28 +681,26 @@ def cache_replace_if_equals(connection: 'Connection', cache: Union[str, int], ke
success code, or non-zero status and an error description if something
has gone wrong.
"""
- return __cache_replace_if_equals(connection, cache, key, sample, value, key_hint, sample_hint, value_hint, binary,
- query_id)
+ return __cache_replace_if_equals(connection, cache_info, key, sample, value, key_hint,
+ sample_hint, value_hint, query_id)
async def cache_replace_if_equals_async(
- connection: 'AioConnection', cache: Union[str, int], key: Any, sample: Any, value: Any,
+ connection: 'AioConnection', cache_info: CacheInfo, key: Any, sample: Any, value: Any,
key_hint: 'IgniteDataType' = None, sample_hint: 'IgniteDataType' = None, value_hint: 'IgniteDataType' = None,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+ query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_replace_if_equals.
"""
- return await __cache_replace_if_equals(connection, cache, key, sample, value, key_hint, sample_hint, value_hint,
- binary, query_id)
+ return await __cache_replace_if_equals(connection, cache_info, key, sample, value, key_hint,
+ sample_hint, value_hint, query_id)
-def __cache_replace_if_equals(connection, cache, key, sample, value, key_hint, sample_hint, value_hint, binary,
- query_id):
+def __cache_replace_if_equals(connection, cache_info, key, sample, value, key_hint, sample_hint, value_hint, query_id):
query_struct = Query(
OP_CACHE_REPLACE_IF_EQUALS,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
('sample', sample_hint or AnyDataObject),
('value', value_hint or AnyDataObject),
@@ -765,8 +710,7 @@ def __cache_replace_if_equals(connection, cache, key, sample, value, key_hint, s
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'key': key,
'sample': sample,
'value': value,
@@ -778,86 +722,77 @@ def __cache_replace_if_equals(connection, cache, key, sample, value, key_hint, s
)
-def cache_clear(connection: 'Connection', cache: Union[str, int], binary: bool = False,
- query_id: Optional[int] = None) -> 'APIResult':
+def cache_clear(connection: 'Connection', cache_info: CacheInfo, query_id: Optional[int] = None) -> 'APIResult':
"""
Clears the cache without notifying listeners or cache writers.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
- :param binary: (optional) pass True to keep the value in binary form.
- False by default,
+ :param cache_info: cache meta info,
:param query_id: (optional) a value generated by client and returned
as-is in response.query_id. When the parameter is omitted, a random
value is generated,
:return: API result data object. Contains zero status on success,
non-zero status and an error description otherwise.
"""
- return __cache_clear(connection, cache, binary, query_id)
+ return __cache_clear(connection, cache_info, query_id)
-async def cache_clear_async(connection: 'AioConnection', cache: Union[str, int], binary: bool = False,
- query_id: Optional[int] = None) -> 'APIResult':
+async def cache_clear_async(
+ connection: 'AioConnection', cache_info: CacheInfo, query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_clear.
"""
- return await __cache_clear(connection, cache, binary, query_id)
+ return await __cache_clear(connection, cache_info, query_id)
-def __cache_clear(connection, cache, binary, query_id):
+def __cache_clear(connection, cache_info, query_id):
query_struct = Query(
OP_CACHE_CLEAR,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
],
query_id=query_id,
)
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
},
)
-def cache_clear_key(connection: 'Connection', cache: Union[str, int], key: Any, key_hint: 'IgniteDataType' = None,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+def cache_clear_key(connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
+ query_id: Optional[int] = None) -> 'APIResult':
"""
Clears the cache key without notifying listeners or cache writers.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param key: key for the cache entry,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
- :param binary: (optional) pass True to keep the value in binary form.
- False by default,
:param query_id: (optional) a value generated by client and returned
as-is in response.query_id. When the parameter is omitted, a random
value is generated,
:return: API result data object. Contains zero status on success,
non-zero status and an error description otherwise.
"""
- return __cache_clear_key(connection, cache, key, key_hint, binary, query_id)
+ return __cache_clear_key(connection, cache_info, key, key_hint, query_id)
-async def cache_clear_key_async(connection: 'AioConnection', cache: Union[str, int], key: Any,
- key_hint: 'IgniteDataType' = None, binary: bool = False,
- query_id: Optional[int] = None) -> 'APIResult':
+async def cache_clear_key_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any,
+ key_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_clear_key.
"""
- return await __cache_clear_key(connection, cache, key, key_hint, binary, query_id)
+ return await __cache_clear_key(connection, cache_info, key, key_hint, query_id)
-def __cache_clear_key(connection, cache, key, key_hint, binary, query_id):
+def __cache_clear_key(connection, cache_info, key, key_hint, query_id):
query_struct = Query(
OP_CACHE_CLEAR_KEY,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
],
query_id=query_id,
@@ -865,46 +800,43 @@ def __cache_clear_key(connection, cache, key, key_hint, binary, query_id):
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'key': key,
},
)
-def cache_clear_keys(connection: 'Connection', cache: Union[str, int], keys: Iterable, binary: bool = False,
- query_id: Optional[int] = None) -> 'APIResult':
+def cache_clear_keys(
+ connection: 'Connection', cache_info: CacheInfo, keys: Iterable, query_id: Optional[int] = None) -> 'APIResult':
"""
Clears the cache keys without notifying listeners or cache writers.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param keys: list of keys or tuples of (key, key_hint),
- :param binary: (optional) pass True to keep the value in binary form.
- False by default,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status on success,
non-zero status and an error description otherwise.
"""
- return __cache_clear_keys(connection, cache, keys, binary, query_id)
+ return __cache_clear_keys(connection, cache_info, keys, query_id)
-async def cache_clear_keys_async(connection: 'AioConnection', cache: Union[str, int], keys: Iterable,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+async def cache_clear_keys_async(
+ connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable, query_id: Optional[int] = None
+) -> 'APIResult':
"""
Async version of cache_clear_keys.
"""
- return await __cache_clear_keys(connection, cache, keys, binary, query_id)
+ return await __cache_clear_keys(connection, cache_info, keys, query_id)
-def __cache_clear_keys(connection, cache, keys, binary, query_id):
+def __cache_clear_keys(connection, cache_info, keys, query_id):
query_struct = Query(
OP_CACHE_CLEAR_KEYS,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('keys', AnyDataArray()),
],
query_id=query_id,
@@ -912,25 +844,22 @@ def __cache_clear_keys(connection, cache, keys, binary, query_id):
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'keys': keys,
},
)
-def cache_remove_key(connection: 'Connection', cache: Union[str, int], key: Any, key_hint: 'IgniteDataType' = None,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+def cache_remove_key(connection: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
+ query_id: Optional[int] = None) -> 'APIResult':
"""
Clears the cache key without notifying listeners or cache writers.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param key: key for the cache entry,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
- :param binary: (optional) pass True to keep the value in binary form.
- False by default,
:param query_id: (optional) a value generated by client and returned
as-is in response.query_id. When the parameter is omitted, a random
value is generated,
@@ -938,24 +867,22 @@ def cache_remove_key(connection: 'Connection', cache: Union[str, int], key: Any,
success code, or non-zero status and an error description if something
has gone wrong.
"""
- return __cache_remove_key(connection, cache, key, key_hint, binary, query_id)
+ return __cache_remove_key(connection, cache_info, key, key_hint, query_id)
-async def cache_remove_key_async(connection: 'AioConnection', cache: Union[str, int], key: Any,
- key_hint: 'IgniteDataType' = None, binary: bool = False,
- query_id: Optional[int] = None) -> 'APIResult':
+async def cache_remove_key_async(connection: 'AioConnection', cache_info: CacheInfo, key: Any,
+ key_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_remove_key.
"""
- return await __cache_remove_key(connection, cache, key, key_hint, binary, query_id)
+ return await __cache_remove_key(connection, cache_info, key, key_hint, query_id)
-def __cache_remove_key(connection, cache, key, key_hint, binary, query_id):
+def __cache_remove_key(connection, cache_info, key, key_hint, query_id):
query_struct = Query(
OP_CACHE_REMOVE_KEY,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
],
query_id=query_id,
@@ -963,8 +890,7 @@ def __cache_remove_key(connection, cache, key, key_hint, binary, query_id):
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'key': key,
},
response_config=[
@@ -974,23 +900,21 @@ def __cache_remove_key(connection, cache, key, key_hint, binary, query_id):
)
-def cache_remove_if_equals(connection: 'Connection', cache: Union[str, int], key: Any, sample: Any,
+def cache_remove_if_equals(connection: 'Connection', cache_info: CacheInfo, key: Any, sample: Any,
key_hint: 'IgniteDataType' = None, sample_hint: 'IgniteDataType' = None,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+ query_id: Optional[int] = None) -> 'APIResult':
"""
Removes an entry with a given key if provided value is equal to
actual value, notifying listeners and cache writers.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param key: key for the cache entry,
:param sample: a sample to compare the stored value with,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param sample_hint: (optional) Ignite data type, for whic
the given sample should be converted
- :param binary: (optional) pass True to keep the value in binary form.
- False by default,
:param query_id: (optional) a value generated by client and returned
as-is in response.query_id. When the parameter is omitted, a random
value is generated,
@@ -998,24 +922,23 @@ def cache_remove_if_equals(connection: 'Connection', cache: Union[str, int], key
success code, or non-zero status and an error description if something
has gone wrong.
"""
- return __cache_remove_if_equals(connection, cache, key, sample, key_hint, sample_hint, binary, query_id)
+ return __cache_remove_if_equals(connection, cache_info, key, sample, key_hint, sample_hint, query_id)
async def cache_remove_if_equals_async(
- connection: 'AioConnection', cache: Union[str, int], key: Any, sample: Any, key_hint: 'IgniteDataType' = None,
- sample_hint: 'IgniteDataType' = None, binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+ connection: 'AioConnection', cache_info: CacheInfo, key: Any, sample: Any, key_hint: 'IgniteDataType' = None,
+ sample_hint: 'IgniteDataType' = None, query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_remove_if_equals.
"""
- return await __cache_remove_if_equals(connection, cache, key, sample, key_hint, sample_hint, binary, query_id)
+ return await __cache_remove_if_equals(connection, cache_info, key, sample, key_hint, sample_hint, query_id)
-def __cache_remove_if_equals(connection, cache, key, sample, key_hint, sample_hint, binary, query_id):
+def __cache_remove_if_equals(connection, cache_info, key, sample, key_hint, sample_hint, query_id):
query_struct = Query(
OP_CACHE_REMOVE_IF_EQUALS,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
('sample', sample_hint or AnyDataObject),
],
@@ -1024,8 +947,7 @@ def __cache_remove_if_equals(connection, cache, key, sample, key_hint, sample_hi
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'key': key,
'sample': sample,
},
@@ -1036,39 +958,37 @@ def __cache_remove_if_equals(connection, cache, key, sample, key_hint, sample_hi
)
-def cache_remove_keys(connection: 'Connection', cache: Union[str, int], keys: Iterable, binary: bool = False,
- query_id: Optional[int] = None) -> 'APIResult':
+def cache_remove_keys(
+ connection: 'Connection', cache_info: CacheInfo, keys: Iterable, query_id: Optional[int] = None) -> 'APIResult':
"""
Removes entries with given keys, notifying listeners and cache writers.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param keys: list of keys or tuples of (key, key_hint),
- :param binary: (optional) pass True to keep the value in binary form.
- False by default,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status on success,
non-zero status and an error description otherwise.
"""
- return __cache_remove_keys(connection, cache, keys, binary, query_id)
+ return __cache_remove_keys(connection, cache_info, keys, query_id)
-async def cache_remove_keys_async(connection: 'AioConnection', cache: Union[str, int], keys: Iterable,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+async def cache_remove_keys_async(
+ connection: 'AioConnection', cache_info: CacheInfo, keys: Iterable, query_id: Optional[int] = None
+) -> 'APIResult':
"""
Async version of cache_remove_keys.
"""
- return await __cache_remove_keys(connection, cache, keys, binary, query_id)
+ return await __cache_remove_keys(connection, cache_info, keys, query_id)
-def __cache_remove_keys(connection, cache, keys, binary, query_id):
+def __cache_remove_keys(connection, cache_info, keys, query_id):
query_struct = Query(
OP_CACHE_REMOVE_KEYS,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('keys', AnyDataArray()),
],
query_id=query_id,
@@ -1076,69 +996,61 @@ def __cache_remove_keys(connection, cache, keys, binary, query_id):
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'keys': keys,
},
)
-def cache_remove_all(connection: 'Connection', cache: Union[str, int], binary: bool = False,
- query_id: Optional[int] = None) -> 'APIResult':
+def cache_remove_all(connection: 'Connection', cache_info: CacheInfo, query_id: Optional[int] = None) -> 'APIResult':
"""
- Removes all entries from cache, notifying listeners and cache writers.
+ Removes all entries from cache_info, notifying listeners and cache writers.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
- :param binary: (optional) pass True to keep the value in binary form.
- False by default,
+ :param cache_info: cache meta info,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status on success,
non-zero status and an error description otherwise.
"""
- return __cache_remove_all(connection, cache, binary, query_id)
+ return __cache_remove_all(connection, cache_info, query_id)
-async def cache_remove_all_async(connection: 'AioConnection', cache: Union[str, int], binary: bool = False,
- query_id: Optional[int] = None) -> 'APIResult':
+async def cache_remove_all_async(
+ connection: 'AioConnection', cache_info: CacheInfo, query_id: Optional[int] = None) -> 'APIResult':
"""
Async version of cache_remove_all.
"""
- return await __cache_remove_all(connection, cache, binary, query_id)
+ return await __cache_remove_all(connection, cache_info, query_id)
-def __cache_remove_all(connection, cache, binary, query_id):
+def __cache_remove_all(connection, cache_info, query_id):
query_struct = Query(
OP_CACHE_REMOVE_ALL,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
],
query_id=query_id,
)
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
},
)
-def cache_get_size(connection: 'Connection', cache: Union[str, int], peek_modes: Union[int, list, tuple] = None,
- binary: bool = False, query_id: Optional[int] = None) -> 'APIResult':
+def cache_get_size(connection: 'Connection', cache_info: CacheInfo, peek_modes: Union[int, list, tuple] = None,
+ query_id: Optional[int] = None) -> 'APIResult':
"""
Gets the number of entries in cache.
:param connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param peek_modes: (optional) limit count to near cache partition
(PeekModes.NEAR), primary cache (PeekModes.PRIMARY), or backup cache
(PeekModes.BACKUP). Defaults to pimary cache partitions (PeekModes.PRIMARY),
- :param binary: (optional) pass True to keep the value in binary form.
- False by default,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
@@ -1146,16 +1058,17 @@ def cache_get_size(connection: 'Connection', cache: Union[str, int], peek_modes:
cache entries on success, non-zero status and an error description
otherwise.
"""
- return __cache_get_size(connection, cache, peek_modes, binary, query_id)
+ return __cache_get_size(connection, cache_info, peek_modes, query_id)
-async def cache_get_size_async(connection: 'AioConnection', cache: Union[str, int],
- peek_modes: Union[int, list, tuple] = None, binary: bool = False,
- query_id: Optional[int] = None) -> 'APIResult':
- return await __cache_get_size(connection, cache, peek_modes, binary, query_id)
+async def cache_get_size_async(
+ connection: 'AioConnection', cache_info: CacheInfo, peek_modes: Union[int, list, tuple] = None,
+ query_id: Optional[int] = None
+) -> 'APIResult':
+ return await __cache_get_size(connection, cache_info, peek_modes, query_id)
-def __cache_get_size(connection, cache, peek_modes, binary, query_id):
+def __cache_get_size(connection, cache_info, peek_modes, query_id):
if peek_modes is None:
peek_modes = []
elif not isinstance(peek_modes, (list, tuple)):
@@ -1164,8 +1077,7 @@ def __cache_get_size(connection, cache, peek_modes, binary, query_id):
query_struct = Query(
OP_CACHE_GET_SIZE,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('peek_modes', ByteArray),
],
query_id=query_id,
@@ -1173,8 +1085,7 @@ def __cache_get_size(connection, cache, peek_modes, binary, query_id):
return query_perform(
query_struct, connection,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'peek_modes': peek_modes,
},
response_config=[
@@ -1184,9 +1095,8 @@ def __cache_get_size(connection, cache, peek_modes, binary, query_id):
)
-def cache_local_peek(conn: 'Connection', cache: Union[str, int], key: Any, key_hint: 'IgniteDataType' = None,
- peek_modes: Union[int, list, tuple] = None, binary: bool = False,
- query_id: Optional[int] = None) -> 'APIResult':
+def cache_local_peek(conn: 'Connection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
+ peek_modes: Union[int, list, tuple] = None, query_id: Optional[int] = None) -> 'APIResult':
"""
Peeks at in-memory cached value using default optional peek mode.
@@ -1194,35 +1104,33 @@ def cache_local_peek(conn: 'Connection', cache: Union[str, int], key: Any, key_h
node.
:param conn: connection: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info,
:param key: entry key,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param peek_modes: (optional) limit count to near cache partition
(PeekModes.NEAR), primary cache (PeekModes.PRIMARY), or backup cache
(PeekModes.BACKUP). Defaults to primary cache partitions (PeekModes.PRIMARY),
- :param binary: (optional) pass True to keep the value in binary form.
- False by default,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
:return: API result data object. Contains zero status and a peeked value
(null if not found).
"""
- return __cache_local_peek(conn, cache, key, key_hint, peek_modes, binary, query_id)
+ return __cache_local_peek(conn, cache_info, key, key_hint, peek_modes, query_id)
async def cache_local_peek_async(
- conn: 'AioConnection', cache: Union[str, int], key: Any, key_hint: 'IgniteDataType' = None,
- peek_modes: Union[int, list, tuple] = None, binary: bool = False,
- query_id: Optional[int] = None) -> 'APIResult':
+ conn: 'AioConnection', cache_info: CacheInfo, key: Any, key_hint: 'IgniteDataType' = None,
+ peek_modes: Union[int, list, tuple] = None, query_id: Optional[int] = None
+) -> 'APIResult':
"""
Async version of cache_local_peek.
"""
- return await __cache_local_peek(conn, cache, key, key_hint, peek_modes, binary, query_id)
+ return await __cache_local_peek(conn, cache_info, key, key_hint, peek_modes, query_id)
-def __cache_local_peek(conn, cache, key, key_hint, peek_modes, binary, query_id):
+def __cache_local_peek(conn, cache_info, key, key_hint, peek_modes, query_id):
if peek_modes is None:
peek_modes = []
elif not isinstance(peek_modes, (list, tuple)):
@@ -1231,8 +1139,7 @@ def __cache_local_peek(conn, cache, key, key_hint, peek_modes, binary, query_id)
query_struct = Query(
OP_CACHE_LOCAL_PEEK,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('key', key_hint or AnyDataObject),
('peek_modes', ByteArray),
],
@@ -1241,8 +1148,7 @@ def __cache_local_peek(conn, cache, key, key_hint, peek_modes, binary, query_id)
return query_perform(
query_struct, conn,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'key': key,
'peek_modes': peek_modes,
},
diff --git a/pyignite/api/sql.py b/pyignite/api/sql.py
index b10cc7d..267bc5b 100644
--- a/pyignite/api/sql.py
+++ b/pyignite/api/sql.py
@@ -13,35 +13,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Union
-
from pyignite.connection import AioConnection, Connection
-from pyignite.datatypes import AnyDataArray, AnyDataObject, Bool, Byte, Int, Long, Map, Null, String, StructArray
+from pyignite.datatypes import AnyDataArray, AnyDataObject, Bool, Int, Long, Map, Null, String, StructArray
from pyignite.datatypes.sql import StatementType
from pyignite.queries import Query, query_perform
from pyignite.queries.op_codes import (
OP_QUERY_SCAN, OP_QUERY_SCAN_CURSOR_GET_PAGE, OP_QUERY_SQL, OP_QUERY_SQL_CURSOR_GET_PAGE, OP_QUERY_SQL_FIELDS,
OP_QUERY_SQL_FIELDS_CURSOR_GET_PAGE, OP_RESOURCE_CLOSE
)
-from pyignite.utils import cache_id, deprecated
+from pyignite.utils import deprecated
from .result import APIResult
+from ..queries.query import CacheInfo
from ..queries.response import SQLResponse
-def scan(conn: 'Connection', cache: Union[str, int], page_size: int, partitions: int = -1, local: bool = False,
- binary: bool = False, query_id: int = None) -> APIResult:
+def scan(conn: 'Connection', cache_info: CacheInfo, page_size: int, partitions: int = -1, local: bool = False,
+ query_id: int = None) -> APIResult:
"""
Performs scan query.
:param conn: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: cache meta info.
:param page_size: cursor page size,
:param partitions: (optional) number of partitions to query
(negative to query entire cache),
:param local: (optional) pass True if this query should be executed
on local node only. Defaults to False,
- :param binary: (optional) pass True to keep the value in binary form.
- False by default,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
@@ -56,15 +53,15 @@ def scan(conn: 'Connection', cache: Union[str, int], page_size: int, partitions:
* `more`: bool, True if more data is available for subsequent
‘scan_cursor_get_page’ calls.
"""
- return __scan(conn, cache, page_size, partitions, local, binary, query_id)
+ return __scan(conn, cache_info, page_size, partitions, local, query_id)
-async def scan_async(conn: 'AioConnection', cache: Union[str, int], page_size: int, partitions: int = -1,
- local: bool = False, binary: bool = False, query_id: int = None) -> APIResult:
+async def scan_async(conn: 'AioConnection', cache_info: CacheInfo, page_size: int, partitions: int = -1,
+ local: bool = False, query_id: int = None) -> APIResult:
"""
Async version of scan.
"""
- return await __scan(conn, cache, page_size, partitions, local, binary, query_id)
+ return await __scan(conn, cache_info, page_size, partitions, local, query_id)
def __query_result_post_process(result):
@@ -73,12 +70,11 @@ def __query_result_post_process(result):
return result
-def __scan(conn, cache, page_size, partitions, local, binary, query_id):
+def __scan(conn, cache_info, page_size, partitions, local, query_id):
query_struct = Query(
OP_QUERY_SCAN,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('filter', Null),
('page_size', Int),
('partitions', Int),
@@ -89,8 +85,7 @@ def __scan(conn, cache, page_size, partitions, local, binary, query_id):
return query_perform(
query_struct, conn,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'filter': None,
'page_size': page_size,
'partitions': partitions,
@@ -156,18 +151,17 @@ def __scan_cursor_get_page(conn, cursor, query_id):
@deprecated(version='1.2.0', reason="This API is deprecated and will be removed in the following major release. "
"Use sql_fields instead")
def sql(
- conn: 'Connection', cache: Union[str, int],
+ conn: 'Connection', cache_info: CacheInfo,
table_name: str, query_str: str, page_size: int, query_args=None,
distributed_joins: bool = False, replicated_only: bool = False,
- local: bool = False, timeout: int = 0, binary: bool = False,
- query_id: int = None
+ local: bool = False, timeout: int = 0, query_id: int = None
) -> APIResult:
"""
Executes an SQL query over data stored in the cluster. The query returns
the whole record (key and value).
:param conn: connection to Ignite server,
- :param cache: name or ID of the cache,
+ :param cache_info: Cache meta info,
:param table_name: name of a type or SQL table,
:param query_str: SQL query string,
:param page_size: cursor page size,
@@ -179,8 +173,6 @@ def sql(
on local node only. Defaults to False,
:param timeout: (optional) non-negative timeout value in ms. Zero disables
timeout (default),
- :param binary: (optional) pass True to keep the value in binary form.
- False by default,
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
@@ -202,8 +194,7 @@ def sql(
query_struct = Query(
OP_QUERY_SQL,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('table_name', String),
('query_str', String),
('query_args', AnyDataArray()),
@@ -218,8 +209,7 @@ def sql(
result = query_struct.perform(
conn,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'table_name': table_name,
'query_str': query_str,
'query_args': query_args,
@@ -287,19 +277,19 @@ def sql_cursor_get_page(
def sql_fields(
- conn: 'Connection', cache: Union[str, int],
+ conn: 'Connection', cache_info: CacheInfo,
query_str: str, page_size: int, query_args=None, schema: str = None,
statement_type: int = StatementType.ANY, distributed_joins: bool = False,
local: bool = False, replicated_only: bool = False,
enforce_join_order: bool = False, collocated: bool = False,
lazy: bool = False, include_field_names: bool = False, max_rows: int = -1,
- timeout: int = 0, binary: bool = False, query_id: int = None
+ timeout: int = 0, query_id: int = None
) -> APIResult:
"""
Performs SQL fields query.
:param conn: connection to Ignite server,
- :param cache: name or ID of the cache. If zero, then schema is used.
+ :param cache_info: cache meta info.
:param query_str: SQL query string,
:param page_size: cursor page size,
:param query_args: (optional) query arguments. List of values or
@@ -307,9 +297,9 @@ def sql_fields(
:param schema: schema for the query.
:param statement_type: (optional) statement type. Can be:
- * StatementType.ALL − any type (default),
+ * StatementType.ALL − any type (default),
* StatementType.SELECT − select,
- * StatementType.UPDATE − update.
+ * StatementType.UPDATE − update.
:param distributed_joins: (optional) distributed joins.
:param local: (optional) pass True if this query should be executed
@@ -323,7 +313,6 @@ def sql_fields(
:param max_rows: (optional) query-wide maximum of rows.
:param timeout: (optional) non-negative timeout value in ms. Zero disables
timeout.
- :param binary: (optional) pass True to keep the value in binary form.
:param query_id: (optional) a value generated by client and returned as-is
in response.query_id. When the parameter is omitted, a random value
is generated,
@@ -338,39 +327,39 @@ def sql_fields(
* `more`: bool, True if more data is available for subsequent
‘sql_fields_cursor_get_page’ calls.
"""
- return __sql_fields(conn, cache, query_str, page_size, query_args, schema, statement_type, distributed_joins,
+ return __sql_fields(conn, cache_info, query_str, page_size, query_args, schema, statement_type, distributed_joins,
local, replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows,
- timeout, binary, query_id)
+ timeout, query_id)
async def sql_fields_async(
- conn: 'AioConnection', cache: Union[str, int],
+ conn: 'AioConnection', cache_info: CacheInfo,
query_str: str, page_size: int, query_args=None, schema: str = None,
statement_type: int = StatementType.ANY, distributed_joins: bool = False,
local: bool = False, replicated_only: bool = False,
enforce_join_order: bool = False, collocated: bool = False,
lazy: bool = False, include_field_names: bool = False, max_rows: int = -1,
- timeout: int = 0, binary: bool = False, query_id: int = None
+ timeout: int = 0, query_id: int = None
) -> APIResult:
"""
Async version of sql_fields.
"""
- return await __sql_fields(conn, cache, query_str, page_size, query_args, schema, statement_type, distributed_joins,
- local, replicated_only, enforce_join_order, collocated, lazy, include_field_names,
- max_rows, timeout, binary, query_id)
+ return await __sql_fields(conn, cache_info, query_str, page_size, query_args, schema, statement_type,
+ distributed_joins, local, replicated_only, enforce_join_order, collocated, lazy,
+ include_field_names, max_rows, timeout, query_id)
-def __sql_fields(conn, cache, query_str, page_size, query_args, schema, statement_type, distributed_joins, local,
- replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows, timeout,
- binary, query_id):
+def __sql_fields(
+ conn, cache_info, query_str, page_size, query_args, schema, statement_type, distributed_joins, local,
+ replicated_only, enforce_join_order, collocated, lazy, include_field_names, max_rows, timeout, query_id
+):
if query_args is None:
query_args = []
query_struct = Query(
OP_QUERY_SQL_FIELDS,
[
- ('hash_code', Int),
- ('flag', Byte),
+ ('cache_info', CacheInfo),
('schema', String),
('page_size', Int),
('max_rows', Int),
@@ -393,8 +382,7 @@ def __sql_fields(conn, cache, query_str, page_size, query_args, schema, statemen
return query_perform(
query_struct, conn,
query_params={
- 'hash_code': cache_id(cache),
- 'flag': 1 if binary else 0,
+ 'cache_info': cache_info,
'schema': schema,
'page_size': page_size,
'max_rows': max_rows,
diff --git a/pyignite/cache.py b/pyignite/cache.py
index 3c93637..a2444a4 100644
--- a/pyignite/cache.py
+++ b/pyignite/cache.py
@@ -15,9 +15,10 @@
from typing import Any, Iterable, Optional, Tuple, Union
-from .datatypes import prop_codes
+from .datatypes import prop_codes, ExpiryPolicy
from .datatypes.internal import AnyDataObject
-from .exceptions import CacheCreationError, CacheError, ParameterError, SQLError
+from .exceptions import CacheCreationError, CacheError, ParameterError, SQLError, NotSupportedByClusterError
+from .queries.query import CacheInfo
from .utils import cache_id, status_to_exception
from .api.cache_config import (
cache_create, cache_create_with_config, cache_get_or_create, cache_get_or_create_with_config, cache_destroy,
@@ -93,12 +94,14 @@ def __parse_settings(settings: Union[str, dict]) -> Tuple[Optional[str], Optiona
class BaseCache:
- def __init__(self, client: 'BaseClient', name: str):
+ def __init__(self, client: 'BaseClient', name: str, expiry_policy: ExpiryPolicy = None):
self._client = client
self._name = name
self._settings = None
- self._cache_id = cache_id(self._name)
- self._client.register_cache(self._cache_id)
+ self._cache_info = CacheInfo(cache_id=cache_id(self._name),
+ protocol_context=client.protocol_context,
+ expiry_policy=expiry_policy)
+ self._client.register_cache(self.cache_info.cache_id)
@property
def name(self) -> str:
@@ -115,13 +118,43 @@ class BaseCache:
return self._client
@property
+ def cache_info(self) -> CacheInfo:
+ """
+ Cache meta info.
+ """
+ return self._cache_info
+
+ @property
def cache_id(self) -> int:
"""
Cache ID.
:return: integer value of the cache ID.
"""
- return self._cache_id
+ return self._cache_info.cache_id
+
+ def with_expire_policy(
+ self, expiry_policy: Optional[ExpiryPolicy] = None,
+ create: Union[int, float] = ExpiryPolicy.UNCHANGED,
+ update: Union[int, float] = ExpiryPolicy.UNCHANGED,
+ access: Union[int, float] = ExpiryPolicy.UNCHANGED
+ ):
+ """
+ :param expiry_policy: optional :class:`~pyignite.datatypes.expiry_policy.ExpiryPolicy`
+ object. If it is set, other params will be ignored.
+ :param create: create TTL in seconds (float) or milliseconds (int),
+ :param update: Create TTL in seconds (float) or milliseconds (int),
+ :param access: Create TTL in seconds (float) or milliseconds (int).
+ :return: cache decorator with expiry policy set.
+ """
+ if not self.client.protocol_context.is_expiry_policy_supported():
+ raise NotSupportedByClusterError("'ExpiryPolicy' API is not supported by the cluster")
+
+ cache_cls = type(self)
+ if not expiry_policy:
+ expiry_policy = ExpiryPolicy(create=create, update=update, access=access)
+
+ return cache_cls(self.client, self.name, expiry_policy)
class Cache(BaseCache):
@@ -134,17 +167,17 @@ class Cache(BaseCache):
:ref:`this example <create_cache>` on how to do it.
"""
- def __init__(self, client: 'Client', name: str):
+ def __init__(self, client: 'Client', name: str, expiry_policy: ExpiryPolicy = None):
"""
Initialize cache object. For internal use.
:param client: Ignite client,
:param name: Cache name.
"""
- super().__init__(client, name)
+ super().__init__(client, name, expiry_policy)
def _get_best_node(self, key=None, key_hint=None):
- return self.client.get_best_node(self._cache_id, key, key_hint)
+ return self.client.get_best_node(self, key, key_hint)
@property
def settings(self) -> Optional[dict]:
@@ -159,7 +192,7 @@ class Cache(BaseCache):
if self._settings is None:
config_result = cache_get_configuration(
self._get_best_node(),
- self._cache_id
+ self.cache_info
)
if config_result.status == 0:
self._settings = config_result.value
@@ -173,7 +206,7 @@ class Cache(BaseCache):
"""
Destroys cache with a given name.
"""
- return cache_destroy(self._get_best_node(), self._cache_id)
+ return cache_destroy(self._get_best_node(), self.cache_id)
@status_to_exception(CacheError)
def get(self, key, key_hint: object = None) -> Any:
@@ -190,7 +223,7 @@ class Cache(BaseCache):
result = cache_get(
self._get_best_node(key, key_hint),
- self._cache_id,
+ self.cache_info,
key,
key_hint=key_hint
)
@@ -198,9 +231,7 @@ class Cache(BaseCache):
return result
@status_to_exception(CacheError)
- def put(
- self, key, value, key_hint: object = None, value_hint: object = None
- ):
+ def put(self, key, value, key_hint: object = None, value_hint: object = None):
"""
Puts a value with a given key to cache (overwriting existing value
if any).
@@ -217,7 +248,7 @@ class Cache(BaseCache):
return cache_put(
self._get_best_node(key, key_hint),
- self._cache_id, key, value,
+ self.cache_info, key, value,
key_hint=key_hint, value_hint=value_hint
)
@@ -229,7 +260,7 @@ class Cache(BaseCache):
:param keys: list of keys or tuples of (key, key_hint),
:return: a dict of key-value pairs.
"""
- result = cache_get_all(self._get_best_node(), self._cache_id, keys)
+ result = cache_get_all(self._get_best_node(), self.cache_info, keys)
if result.value:
for key, value in result.value.items():
result.value[key] = self.client.unwrap_binary(value)
@@ -245,7 +276,7 @@ class Cache(BaseCache):
to save. Each key or value can be an item of representable
Python type or a tuple of (item, hint),
"""
- return cache_put_all(self._get_best_node(), self._cache_id, pairs)
+ return cache_put_all(self._get_best_node(), self.cache_info, pairs)
@status_to_exception(CacheError)
def replace(
@@ -266,7 +297,7 @@ class Cache(BaseCache):
result = cache_replace(
self._get_best_node(key, key_hint),
- self._cache_id, key, value,
+ self.cache_info, key, value,
key_hint=key_hint, value_hint=value_hint
)
result.value = self.client.unwrap_binary(result.value)
@@ -282,9 +313,9 @@ class Cache(BaseCache):
"""
conn = self._get_best_node()
if keys:
- return cache_clear_keys(conn, self._cache_id, keys)
+ return cache_clear_keys(conn, self.cache_info, keys)
else:
- return cache_clear(conn, self._cache_id)
+ return cache_clear(conn, self.cache_info)
@status_to_exception(CacheError)
def clear_key(self, key, key_hint: object = None):
@@ -300,7 +331,7 @@ class Cache(BaseCache):
return cache_clear_key(
self._get_best_node(key, key_hint),
- self._cache_id,
+ self.cache_info,
key,
key_hint=key_hint
)
@@ -313,7 +344,7 @@ class Cache(BaseCache):
:param keys: a list of keys or (key, type hint) tuples
"""
- return cache_clear_keys(self._get_best_node(), self._cache_id, keys)
+ return cache_clear_keys(self._get_best_node(), self.cache_info, keys)
@status_to_exception(CacheError)
def contains_key(self, key, key_hint=None) -> bool:
@@ -330,7 +361,7 @@ class Cache(BaseCache):
return cache_contains_key(
self._get_best_node(key, key_hint),
- self._cache_id,
+ self.cache_info,
key,
key_hint=key_hint
)
@@ -343,7 +374,7 @@ class Cache(BaseCache):
:param keys: a list of keys or (key, type hint) tuples,
:return: boolean `True` when all keys are present, `False` otherwise.
"""
- return cache_contains_keys(self._get_best_node(), self._cache_id, keys)
+ return cache_contains_keys(self._get_best_node(), self.cache_info, keys)
@status_to_exception(CacheError)
def get_and_put(self, key, value, key_hint=None, value_hint=None) -> Any:
@@ -364,7 +395,7 @@ class Cache(BaseCache):
result = cache_get_and_put(
self._get_best_node(key, key_hint),
- self._cache_id,
+ self.cache_info,
key, value,
key_hint, value_hint
)
@@ -392,7 +423,7 @@ class Cache(BaseCache):
result = cache_get_and_put_if_absent(
self._get_best_node(key, key_hint),
- self._cache_id,
+ self.cache_info,
key, value,
key_hint, value_hint
)
@@ -417,7 +448,7 @@ class Cache(BaseCache):
return cache_put_if_absent(
self._get_best_node(key, key_hint),
- self._cache_id,
+ self.cache_info,
key, value,
key_hint, value_hint
)
@@ -437,7 +468,7 @@ class Cache(BaseCache):
result = cache_get_and_remove(
self._get_best_node(key, key_hint),
- self._cache_id,
+ self.cache_info,
key,
key_hint
)
@@ -466,7 +497,7 @@ class Cache(BaseCache):
result = cache_get_and_replace(
self._get_best_node(key, key_hint),
- self._cache_id,
+ self.cache_info,
key, value,
key_hint, value_hint
)
@@ -486,7 +517,7 @@ class Cache(BaseCache):
key_hint = AnyDataObject.map_python_type(key)
return cache_remove_key(
- self._get_best_node(key, key_hint), self._cache_id, key, key_hint
+ self._get_best_node(key, key_hint), self.cache_info, key, key_hint
)
@status_to_exception(CacheError)
@@ -498,7 +529,7 @@ class Cache(BaseCache):
:param keys: list of keys or tuples of (key, key_hint) to remove.
"""
return cache_remove_keys(
- self._get_best_node(), self._cache_id, keys
+ self._get_best_node(), self.cache_info, keys
)
@status_to_exception(CacheError)
@@ -506,7 +537,7 @@ class Cache(BaseCache):
"""
Removes all cache entries, notifying listeners and cache writers.
"""
- return cache_remove_all(self._get_best_node(), self._cache_id)
+ return cache_remove_all(self._get_best_node(), self.cache_info)
@status_to_exception(CacheError)
def remove_if_equals(self, key, sample, key_hint=None, sample_hint=None):
@@ -526,7 +557,7 @@ class Cache(BaseCache):
return cache_remove_if_equals(
self._get_best_node(key, key_hint),
- self._cache_id,
+ self.cache_info,
key, sample,
key_hint, sample_hint
)
@@ -556,7 +587,7 @@ class Cache(BaseCache):
result = cache_replace_if_equals(
self._get_best_node(key, key_hint),
- self._cache_id,
+ self.cache_info,
key, sample, value,
key_hint, sample_hint, value_hint
)
@@ -574,7 +605,7 @@ class Cache(BaseCache):
:return: integer number of cache entries.
"""
return cache_get_size(
- self._get_best_node(), self._cache_id, peek_modes
+ self._get_best_node(), self.cache_info, peek_modes
)
def scan(self, page_size: int = 1, partitions: int = -1, local: bool = False) -> ScanCursor:
@@ -590,7 +621,7 @@ class Cache(BaseCache):
on local node only. Defaults to False,
:return: Scan query cursor.
"""
- return ScanCursor(self.client, self._cache_id, page_size, partitions, local)
+ return ScanCursor(self.client, self.cache_info, page_size, partitions, local)
def select_row(
self, query_str: str, page_size: int = 1,
@@ -621,5 +652,5 @@ class Cache(BaseCache):
if not type_name:
raise SQLError('Value type is unknown')
- return SqlCursor(self.client, self._cache_id, type_name, query_str, page_size, query_args,
+ return SqlCursor(self.client, self.cache_info, type_name, query_str, page_size, query_args,
distributed_joins, replicated_only, local, timeout)
diff --git a/pyignite/client.py b/pyignite/client.py
index 17e9d80..099b44d 100644
--- a/pyignite/client.py
+++ b/pyignite/client.py
@@ -58,6 +58,7 @@ from .datatypes import BinaryObject, AnyDataObject
from .datatypes.base import IgniteDataType
from .datatypes.internal import tc_map
from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors
+from .queries.query import CacheInfo
from .stream import BinaryStream, READ_BACKWARD
from .utils import (
cache_id, capitalize, entity_id, schema_id, process_delimiter, status_to_exception, is_iterable, is_wrapped,
@@ -719,20 +720,24 @@ class Client(BaseClient):
:return: sql fields cursor with result rows as a lists. If
`include_field_names` was set, the first row will hold field names.
"""
+ if isinstance(cache, (int, str)):
+ c_info = CacheInfo(cache_id=cache_id(cache), protocol_context=self.protocol_context)
+ elif isinstance(cache, Cache):
+ c_info = cache.cache_info
+ else:
+ c_info = None
- c_id = cache.cache_id if isinstance(cache, Cache) else cache_id(cache)
-
- if c_id != 0:
+ if c_info:
schema = None
- return SqlFieldsCursor(self, c_id, query_str, page_size, query_args, schema, statement_type, distributed_joins,
- local, replicated_only, enforce_join_order, collocated, lazy, include_field_names,
- max_rows, timeout)
+ return SqlFieldsCursor(self, c_info, query_str, page_size, query_args, schema, statement_type,
+ distributed_joins, local, replicated_only, enforce_join_order, collocated, lazy,
+ include_field_names, max_rows, timeout)
def get_cluster(self) -> 'Cluster':
"""
- Gets client cluster facade.
+ Get client cluster facade.
- :return: Client cluster facade.
+ :return: :py:class:`~pyignite.cluster.Cluster` instance.
"""
return Cluster(self)
diff --git a/pyignite/cluster.py b/pyignite/cluster.py
index f10afe4..d953b5c 100644
--- a/pyignite/cluster.py
+++ b/pyignite/cluster.py
@@ -20,6 +20,7 @@ whole cluster.
from pyignite.api.cluster import cluster_get_state, cluster_set_state
from pyignite.exceptions import ClusterError
from pyignite.utils import status_to_exception
+from pyignite.datatypes import ClusterState
class Cluster:
@@ -30,27 +31,34 @@ class Cluster:
"""
def __init__(self, client: 'Client'):
+ """
+ :param client: :py:class:`~pyignite.client.Client` instance.
+ """
self._client = client
@status_to_exception(ClusterError)
- def get_state(self):
+ def get_state(self) -> 'ClusterState':
"""
Gets current cluster state.
- :return: Current cluster state. This is one of ClusterState.INACTIVE,
- ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
+ :return: Current cluster state. This is one of
+ :py:attr:`~pyignite.datatypes.cluster_state.ClusterState.INACTIVE`,
+ :py:attr:`~pyignite.datatypes.cluster_state.ClusterState.ACTIVE`,
+ :py:attr:`~pyignite.datatypes.cluster_state.ClusterState.ACTIVE_READ_ONLY`.
"""
return cluster_get_state(self._client.random_node)
@status_to_exception(ClusterError)
- def set_state(self, state):
+ def set_state(self, state: 'ClusterState'):
"""
Changes current cluster state to the given.
Note: Deactivation clears in-memory caches (without persistence)
including the system caches.
- :param state: New cluster state. This is one of ClusterState.INACTIVE,
- ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
+ :param state: New cluster state. This is one of
+ :py:attr:`~pyignite.datatypes.cluster_state.ClusterState.INACTIVE`,
+ :py:attr:`~pyignite.datatypes.cluster_state.ClusterState.ACTIVE`,
+ :py:attr:`~pyignite.datatypes.cluster_state.ClusterState.ACTIVE_READ_ONLY`.
"""
return cluster_set_state(self._client.random_node, state)
diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py
index 54f5240..be23e56 100644
--- a/pyignite/connection/protocol_context.py
+++ b/pyignite/connection/protocol_context.py
@@ -98,3 +98,6 @@ class ProtocolContext:
Check whether cluster API supported by the current protocol.
"""
return self.features and BitmaskFeature.CLUSTER_API in self.features
+
+ def is_expiry_policy_supported(self) -> bool:
+ return self.version >= (1, 6, 0)
diff --git a/pyignite/cursors.py b/pyignite/cursors.py
index 0a8f0b0..a690d94 100644
--- a/pyignite/cursors.py
+++ b/pyignite/cursors.py
@@ -64,15 +64,15 @@ class BaseCursorMixin:
setattr(self, '_more', value)
@property
- def cache_id(self):
+ def cache_info(self):
"""
Cache id.
"""
- return getattr(self, '_cache_id', None)
+ return getattr(self, '_cache_info', None)
- @cache_id.setter
- def cache_id(self, value):
- setattr(self, '_cache_id', value)
+ @cache_info.setter
+ def cache_info(self, value):
+ setattr(self, '_cache_info', value)
@property
def client(self):
@@ -134,9 +134,9 @@ class AioCursorMixin(BaseCursorMixin):
class AbstractScanCursor:
- def __init__(self, client, cache_id, page_size, partitions, local):
+ def __init__(self, client, cache_info, page_size, partitions, local):
self.client = client
- self.cache_id = cache_id
+ self.cache_info = cache_info
self._page_size = page_size
self._partitions = partitions
self._local = local
@@ -159,18 +159,18 @@ class ScanCursor(AbstractScanCursor, CursorMixin):
"""
Synchronous scan cursor.
"""
- def __init__(self, client, cache_id, page_size, partitions, local):
+ def __init__(self, client, cache_info, page_size, partitions, local):
"""
:param client: Synchronous Apache Ignite client.
- :param cache_id: Cache id.
+ :param cache_info: Cache meta info.
:param page_size: page size.
:param partitions: number of partitions to query (negative to query entire cache).
:param local: pass True if this query should be executed on local node only.
"""
- super().__init__(client, cache_id, page_size, partitions, local)
+ super().__init__(client, cache_info, page_size, partitions, local)
self.connection = self.client.random_node
- result = scan(self.connection, self.cache_id, self._page_size, self._partitions, self._local)
+ result = scan(self.connection, self.cache_info, self._page_size, self._partitions, self._local)
self._finalize_init(result)
def __next__(self):
@@ -193,20 +193,20 @@ class AioScanCursor(AbstractScanCursor, AioCursorMixin):
"""
Asynchronous scan query cursor.
"""
- def __init__(self, client, cache_id, page_size, partitions, local):
+ def __init__(self, client, cache_info, page_size, partitions, local):
"""
:param client: Asynchronous Apache Ignite client.
- :param cache_id: Cache id.
+ :param cache_info: Cache meta info.
:param page_size: page size.
:param partitions: number of partitions to query (negative to query entire cache).
:param local: pass True if this query should be executed on local node only.
"""
- super().__init__(client, cache_id, page_size, partitions, local)
+ super().__init__(client, cache_info, page_size, partitions, local)
async def __aenter__(self):
if not self.connection:
self.connection = await self.client.random_node()
- result = await scan_async(self.connection, self.cache_id, self._page_size, self._partitions, self._local)
+ result = await scan_async(self.connection, self.cache_info, self._page_size, self._partitions, self._local)
self._finalize_init(result)
return self
@@ -238,15 +238,15 @@ class SqlCursor(CursorMixin):
"""
Synchronous SQL query cursor.
"""
- def __init__(self, client, cache_id, *args, **kwargs):
+ def __init__(self, client, cache_info, *args, **kwargs):
"""
:param client: Synchronous Apache Ignite client.
- :param cache_id: Cache id.
+ :param cache_info: Cache meta info.
"""
self.client = client
- self.cache_id = cache_id
+ self.cache_info = cache_info
self.connection = self.client.random_node
- result = sql(self.connection, self.cache_id, *args, **kwargs)
+ result = sql(self.connection, self.cache_info, *args, **kwargs)
if result.status != 0:
raise SQLError(result.message)
@@ -274,9 +274,9 @@ class SqlCursor(CursorMixin):
class AbstractSqlFieldsCursor:
- def __init__(self, client, cache_id):
+ def __init__(self, client, cache_info):
self.client = client
- self.cache_id = cache_id
+ self.cache_info = cache_info
def _finalize_init(self, result):
if result.status != 0:
@@ -295,14 +295,14 @@ class SqlFieldsCursor(AbstractSqlFieldsCursor, CursorMixin):
"""
Synchronous SQL fields query cursor.
"""
- def __init__(self, client, cache_id, *args, **kwargs):
+ def __init__(self, client, cache_info, *args, **kwargs):
"""
:param client: Synchronous Apache Ignite client.
- :param cache_id: Cache id.
+ :param cache_info: Cache meta info.
"""
- super().__init__(client, cache_id)
+ super().__init__(client, cache_info)
self.connection = self.client.random_node
- self._finalize_init(sql_fields(self.connection, self.cache_id, *args, **kwargs))
+ self._finalize_init(sql_fields(self.connection, self.cache_info, *args, **kwargs))
def __next__(self):
if not self.data:
@@ -334,12 +334,12 @@ class AioSqlFieldsCursor(AbstractSqlFieldsCursor, AioCursorMixin):
"""
Asynchronous SQL fields query cursor.
"""
- def __init__(self, client, cache_id, *args, **kwargs):
+ def __init__(self, client, cache_info, *args, **kwargs):
"""
:param client: Synchronous Apache Ignite client.
- :param cache_id: Cache id.
+ :param cache_info: Cache meta info.
"""
- super().__init__(client, cache_id)
+ super().__init__(client, cache_info)
self._params = (args, kwargs)
async def __aenter__(self):
@@ -381,4 +381,4 @@ class AioSqlFieldsCursor(AbstractSqlFieldsCursor, AioCursorMixin):
return
self.connection = await self.client.random_node()
- self._finalize_init(await sql_fields_async(self.connection, self.cache_id, *args, **kwargs))
+ self._finalize_init(await sql_fields_async(self.connection, self.cache_info, *args, **kwargs))
diff --git a/pyignite/datatypes/__init__.py b/pyignite/datatypes/__init__.py
index 5024f79..4f78dce 100644
--- a/pyignite/datatypes/__init__.py
+++ b/pyignite/datatypes/__init__.py
@@ -25,3 +25,5 @@ from .primitive import *
from .primitive_arrays import *
from .primitive_objects import *
from .standard import *
+from .cluster_state import ClusterState
+from .expiry_policy import ExpiryPolicy
diff --git a/pyignite/datatypes/cache_config.py b/pyignite/datatypes/cache_config.py
index 04ff607..a2b4322 100644
--- a/pyignite/datatypes/cache_config.py
+++ b/pyignite/datatypes/cache_config.py
@@ -12,14 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+from . import ExpiryPolicy
from .standard import String
from .internal import AnyDataObject, Struct, StructArray
from .primitive import *
__all__ = [
- 'cache_config_struct', 'CacheMode', 'PartitionLossPolicy',
+ 'get_cache_config_struct', 'CacheMode', 'PartitionLossPolicy',
'RebalanceMode', 'WriteSynchronizationMode', 'IndexType',
]
@@ -118,36 +118,40 @@ CacheKeyConfiguration = StructArray([
])
-cache_config_struct = Struct([
- ('length', Int),
- ('cache_atomicity_mode', CacheAtomicityMode),
- ('backups_number', Int),
- ('cache_mode', CacheMode),
- ('copy_on_read', Bool),
- ('data_region_name', String),
- ('eager_ttl', Bool),
- ('statistics_enabled', Bool),
- ('group_name', String),
- ('default_lock_timeout', Long),
- ('max_concurrent_async_operations', Int),
- ('max_query_iterators', Int),
- ('name', String),
- ('is_onheap_cache_enabled', Bool),
- ('partition_loss_policy', PartitionLossPolicy),
- ('query_detail_metric_size', Int),
- ('query_parallelism', Int),
- ('read_from_backup', Bool),
- ('rebalance_batch_size', Int),
- ('rebalance_batches_prefetch_count', Long),
- ('rebalance_delay', Long),
- ('rebalance_mode', RebalanceMode),
- ('rebalance_order', Int),
- ('rebalance_throttle', Long),
- ('rebalance_timeout', Long),
- ('sql_escape_all', Bool),
- ('sql_index_inline_max_size', Int),
- ('sql_schema', String),
- ('write_synchronization_mode', WriteSynchronizationMode),
- ('cache_key_configuration', CacheKeyConfiguration),
- ('query_entities', QueryEntities),
-])
+def get_cache_config_struct(protocol_context):
+ fields = [
+ ('length', Int),
+ ('cache_atomicity_mode', CacheAtomicityMode),
+ ('backups_number', Int),
+ ('cache_mode', CacheMode),
+ ('copy_on_read', Bool),
+ ('data_region_name', String),
+ ('eager_ttl', Bool),
+ ('statistics_enabled', Bool),
+ ('group_name', String),
+ ('default_lock_timeout', Long),
+ ('max_concurrent_async_operations', Int),
+ ('max_query_iterators', Int),
+ ('name', String),
+ ('is_onheap_cache_enabled', Bool),
+ ('partition_loss_policy', PartitionLossPolicy),
+ ('query_detail_metric_size', Int),
+ ('query_parallelism', Int),
+ ('read_from_backup', Bool),
+ ('rebalance_batch_size', Int),
+ ('rebalance_batches_prefetch_count', Long),
+ ('rebalance_delay', Long),
+ ('rebalance_mode', RebalanceMode),
+ ('rebalance_order', Int),
+ ('rebalance_throttle', Long),
+ ('rebalance_timeout', Long),
+ ('sql_escape_all', Bool),
+ ('sql_index_inline_max_size', Int),
+ ('sql_schema', String),
+ ('write_synchronization_mode', WriteSynchronizationMode),
+ ('cache_key_configuration', CacheKeyConfiguration),
+ ('query_entities', QueryEntities),
+ ]
+ if protocol_context.is_expiry_policy_supported():
+ fields.append(('expiry_policy', ExpiryPolicy))
+ return Struct(fields=fields)
diff --git a/pyignite/datatypes/cache_properties.py b/pyignite/datatypes/cache_properties.py
index 9bf34de..a1766f3 100644
--- a/pyignite/datatypes/cache_properties.py
+++ b/pyignite/datatypes/cache_properties.py
@@ -15,6 +15,7 @@
import ctypes
+from . import ExpiryPolicy
from .prop_codes import *
from .cache_config import (
CacheMode, CacheAtomicityMode, PartitionLossPolicy, RebalanceMode,
@@ -34,7 +35,7 @@ __all__ = [
'PropRebalanceOrder', 'PropRebalanceThrottle', 'PropGroupName',
'PropCacheKeyConfiguration', 'PropDefaultLockTimeout',
'PropMaxConcurrentAsyncOperation', 'PropPartitionLossPolicy',
- 'PropEagerTTL', 'PropStatisticsEnabled', 'prop_map', 'AnyProperty',
+ 'PropEagerTTL', 'PropStatisticsEnabled', 'PropExpiryPolicy', 'prop_map', 'AnyProperty',
]
@@ -70,6 +71,7 @@ def prop_map(code: int):
PROP_PARTITION_LOSS_POLICY: PropPartitionLossPolicy,
PROP_EAGER_TTL: PropEagerTTL,
PROP_STATISTICS_ENABLED: PropStatisticsEnabled,
+ PROP_EXPIRY_POLICY: PropExpiryPolicy,
}[code]
@@ -285,6 +287,11 @@ class PropStatisticsEnabled(PropBase):
prop_data_class = Bool
+class PropExpiryPolicy(PropBase):
+ prop_code = PROP_EXPIRY_POLICY
+ prop_data_class = ExpiryPolicy
+
+
class AnyProperty(PropBase):
@classmethod
diff --git a/pyignite/datatypes/cluster_state.py b/pyignite/datatypes/cluster_state.py
index 863a1d2..def5591 100644
--- a/pyignite/datatypes/cluster_state.py
+++ b/pyignite/datatypes/cluster_state.py
@@ -17,6 +17,10 @@ from enum import IntEnum
class ClusterState(IntEnum):
+ """
+ Cluster states.
+ """
+
#: Cluster deactivated. Cache operations aren't allowed.
INACTIVE = 0
diff --git a/pyignite/datatypes/expiry_policy.py b/pyignite/datatypes/expiry_policy.py
new file mode 100644
index 0000000..3572754
--- /dev/null
+++ b/pyignite/datatypes/expiry_policy.py
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import ctypes
+from io import SEEK_CUR
+from typing import Union
+
+import attr
+
+from pyignite.constants import PROTOCOL_BYTE_ORDER
+
+
+def _positive(_, attrib, value):
+ if value < 0 and value not in [ExpiryPolicy.UNCHANGED, ExpiryPolicy.ETERNAL]:
+ raise ValueError(f"'{attrib.name}' value must not be negative")
+
+
+def _write_duration(stream, value):
+ if isinstance(value, float):
+ value = int(value * 1000)
+
+ stream.write(value.to_bytes(8, byteorder=PROTOCOL_BYTE_ORDER, signed=True))
+
+
+@attr.s
+class ExpiryPolicy:
+ """
+ Set expiry policy for the cache.
+ """
+ #: Set TTL unchanged.
+ UNCHANGED = -2
+
+ #: Set TTL eternal.
+ ETERNAL = -1
+
+ #: Set TTL for create in seconds(float) or millis(int)
+ create = attr.ib(kw_only=True, default=UNCHANGED,
+ validator=[attr.validators.instance_of((int, float)), _positive])
+
+ #: Set TTL for update in seconds(float) or millis(int)
+ update = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, float],
+ validator=[attr.validators.instance_of((int, float)), _positive])
+
+ #: Set TTL for access in seconds(float) or millis(int)
+ access = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, float],
+ validator=[attr.validators.instance_of((int, float)), _positive])
+
+ class _CType(ctypes.LittleEndianStructure):
+ _pack_ = 1
+ _fields_ = [
+ ('not_null', ctypes.c_byte),
+ ('create', ctypes.c_longlong),
+ ('update', ctypes.c_longlong),
+ ('access', ctypes.c_longlong)
+ ]
+
+ @classmethod
+ def parse(cls, stream):
+ init = stream.tell()
+ not_null = int.from_bytes(stream.slice(init, 1), byteorder=PROTOCOL_BYTE_ORDER)
+ if not_null:
+ stream.seek(ctypes.sizeof(ExpiryPolicy._CType), SEEK_CUR)
+ return ExpiryPolicy._CType
+ stream.seek(ctypes.sizeof(ctypes.c_byte), SEEK_CUR)
+ return ctypes.c_byte
+
+ @classmethod
+ async def parse_async(cls, stream):
+ return cls.parse(stream)
+
+ @classmethod
+ def to_python(cls, ctypes_object):
+ if ctypes_object == 0:
+ return None
+
+ return ExpiryPolicy(create=ctypes_object.create, update=ctypes_object.update, access=ctypes_object.access)
+
+ @classmethod
+ async def to_python_async(cls, ctypes_object):
+ return cls.to_python(ctypes_object)
+
+ @classmethod
+ def from_python(cls, stream, value):
+ if not value:
+ stream.write(b'\x00')
+ return
+
+ stream.write(b'\x01')
+ cls.write_policy(stream, value)
+
+ @classmethod
+ async def from_python_async(cls, stream, value):
+ return cls.from_python(stream, value)
+
+ @classmethod
+ def write_policy(cls, stream, value):
+ _write_duration(stream, value.create)
+ _write_duration(stream, value.update)
+ _write_duration(stream, value.access)
diff --git a/pyignite/datatypes/prop_codes.py b/pyignite/datatypes/prop_codes.py
index 72ffce1..9709313 100644
--- a/pyignite/datatypes/prop_codes.py
+++ b/pyignite/datatypes/prop_codes.py
@@ -47,3 +47,4 @@ PROP_MAX_CONCURRENT_ASYNC_OPERATIONS = 403
PROP_PARTITION_LOSS_POLICY = 404
PROP_EAGER_TTL = 405
PROP_STATISTICS_ENABLED = 406
+PROP_EXPIRY_POLICY = 407
diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py
index 8dac64f..d971eef 100644
--- a/pyignite/queries/query.py
+++ b/pyignite/queries/query.py
@@ -21,7 +21,10 @@ import attr
from pyignite.api.result import APIResult
from pyignite.connection import Connection, AioConnection
-from pyignite.constants import MIN_LONG, MAX_LONG, RHF_TOPOLOGY_CHANGED
+from pyignite.connection.protocol_context import ProtocolContext
+from pyignite.constants import MIN_LONG, MAX_LONG, RHF_TOPOLOGY_CHANGED, PROTOCOL_BYTE_ORDER
+from pyignite.datatypes import ExpiryPolicy
+from pyignite.exceptions import NotSupportedByClusterError
from pyignite.queries.response import Response
from pyignite.stream import AioBinaryStream, BinaryStream, READ_BACKWARD
@@ -45,6 +48,34 @@ def query_perform(query_struct, conn, post_process_fun=None, **kwargs):
@attr.s
+class CacheInfo:
+ cache_id = attr.ib(kw_only=True, type=int)
+ expiry_policy = attr.ib(kw_only=True, type=ExpiryPolicy, default=None)
+ protocol_context = attr.ib(kw_only=True, type=ProtocolContext)
+
+ @classmethod
+ async def from_python_async(cls, stream, value):
+ return cls.from_python(stream, value)
+
+ @classmethod
+ def from_python(cls, stream, value):
+ cache_id = value.cache_id if value else 0
+ expiry_policy = value.expiry_policy if value else None
+ flags = 0
+
+ stream.write(cache_id.to_bytes(4, byteorder=PROTOCOL_BYTE_ORDER, signed=True))
+
+ if expiry_policy:
+ if not value.protocol_context.is_expiry_policy_supported():
+ raise NotSupportedByClusterError("'ExpiryPolicy' API is not supported by the cluster")
+ flags |= 0x04
+
+ stream.write(flags.to_bytes(1, byteorder=PROTOCOL_BYTE_ORDER))
+ if expiry_policy:
+ ExpiryPolicy.write_policy(stream, expiry_policy)
+
+
+@attr.s
class Query:
op_code = attr.ib(type=int)
following = attr.ib(type=list, factory=list)
diff --git a/tests/affinity/conftest.py b/tests/affinity/conftest.py
index e23e0e6..da645c1 100644
--- a/tests/affinity/conftest.py
+++ b/tests/affinity/conftest.py
@@ -66,7 +66,6 @@ async def async_client(connection_param):
@pytest.fixture(scope='module', autouse=True)
def skip_if_no_affinity(request, server1):
client = Client(partition_aware=True)
- client.connect('127.0.0.1', 10801)
-
- if not client.partition_awareness_supported_by_protocol:
- pytest.skip(f'skipped {request.node.name}, partition awareness is not supported.')
+ with client.connect('127.0.0.1', 10801):
+ if not client.partition_awareness_supported_by_protocol:
+ pytest.skip(f'skipped {request.node.name}, partition awareness is not supported.')
diff --git a/tests/affinity/test_affinity.py b/tests/affinity/test_affinity.py
index 64b9cc5..3097991 100644
--- a/tests/affinity/test_affinity.py
+++ b/tests/affinity/test_affinity.py
@@ -312,7 +312,7 @@ def __check_best_node_calculation(client, cache, key, value, key_hint=None):
best_node = client.get_best_node(cache, key, key_hint=key_hint)
for node in filter(lambda n: n.alive, client._nodes):
- result = cache_local_peek(node, cache.cache_id, key, key_hint=key_hint)
+ result = cache_local_peek(node, cache.cache_info, key, key_hint=key_hint)
check_peek_value(node, best_node, result)
@@ -321,7 +321,7 @@ def __check_best_node_calculation(client, cache, key, value, key_hint=None):
best_node = await client.get_best_node(cache, key, key_hint=key_hint)
for node in filter(lambda n: n.alive, client._nodes):
- result = await cache_local_peek_async(node, cache.cache_id, key, key_hint=key_hint)
+ result = await cache_local_peek_async(node, cache.cache_info, key, key_hint=key_hint)
check_peek_value(node, best_node, result)
diff --git a/tests/common/conftest.py b/tests/common/conftest.py
index 243d822..0f28f7e 100644
--- a/tests/common/conftest.py
+++ b/tests/common/conftest.py
@@ -70,3 +70,14 @@ def cache(client):
yield cache
finally:
cache.destroy()
+
+
+@pytest.fixture(autouse=True)
+def expiry_policy_supported(request, server1):
+ client = Client()
+ with client.connect('127.0.0.1', 10801):
+ result = client.protocol_context.is_expiry_policy_supported()
+ if not result and request.node.get_closest_marker('skip_if_no_expiry_policy'):
+ pytest.skip(f'skipped {request.node.name}, ExpiryPolicy APIis not supported.')
+
+ return result
diff --git a/tests/common/test_binary.py b/tests/common/test_binary.py
index 1d7192f..c94c4d5 100644
--- a/tests/common/test_binary.py
+++ b/tests/common/test_binary.py
@@ -74,7 +74,7 @@ def table_cache_read(client):
cache = client.get_cache(table_cache_name)
yield cache
- cache.destroy()
+ client.sql(drop_query)
@pytest.fixture
@@ -87,7 +87,7 @@ async def table_cache_read_async(async_client):
cache = await async_client.get_cache(table_cache_name)
yield cache
- await cache.destroy()
+ await async_client.sql(drop_query)
def test_sql_read_as_binary(table_cache_read):
diff --git a/tests/common/test_cache_config.py b/tests/common/test_cache_config.py
index e68eef5..e5ed33c 100644
--- a/tests/common/test_cache_config.py
+++ b/tests/common/test_cache_config.py
@@ -28,7 +28,7 @@ from pyignite.datatypes.prop_codes import (
PROP_PARTITION_LOSS_POLICY, PROP_EAGER_TTL, PROP_STATISTICS_ENABLED, PROP_REBALANCE_MODE, PROP_REBALANCE_DELAY,
PROP_REBALANCE_TIMEOUT, PROP_REBALANCE_BATCH_SIZE, PROP_REBALANCE_BATCHES_PREFETCH_COUNT, PROP_REBALANCE_ORDER,
PROP_REBALANCE_THROTTLE, PROP_QUERY_ENTITIES, PROP_QUERY_PARALLELISM, PROP_QUERY_DETAIL_METRIC_SIZE,
- PROP_SQL_SCHEMA, PROP_SQL_INDEX_INLINE_MAX_SIZE, PROP_SQL_ESCAPE_ALL, PROP_MAX_QUERY_ITERATORS
+ PROP_SQL_SCHEMA, PROP_SQL_INDEX_INLINE_MAX_SIZE, PROP_SQL_ESCAPE_ALL, PROP_MAX_QUERY_ITERATORS, PROP_EXPIRY_POLICY
)
from pyignite.exceptions import CacheError
@@ -36,8 +36,8 @@ cache_name = 'config_cache'
@pytest.fixture
-def test_cache_settings():
- return {
+def test_cache_settings(expiry_policy_supported):
+ settings = {
PROP_NAME: cache_name,
PROP_CACHE_MODE: CacheMode.PARTITIONED,
PROP_CACHE_ATOMICITY_MODE: CacheAtomicityMode.TRANSACTIONAL,
@@ -96,6 +96,13 @@ def test_cache_settings():
PROP_STATISTICS_ENABLED: True
}
+ if expiry_policy_supported:
+ settings[PROP_EXPIRY_POLICY] = None
+ elif 'PROP_EXPIRY_POLICY' in ALL_PROPS:
+ del ALL_PROPS['PROP_EXPIRY_POLICY']
+
+ return settings
+
@pytest.fixture
def cache(client):
diff --git a/tests/common/test_expiry_policy.py b/tests/common/test_expiry_policy.py
new file mode 100644
index 0000000..cc852c7
--- /dev/null
+++ b/tests/common/test_expiry_policy.py
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import asyncio
+import time
+
+import pytest
+
+from pyignite.datatypes import ExpiryPolicy
+from pyignite.datatypes.prop_codes import PROP_NAME, PROP_EXPIRY_POLICY
+
+
+@pytest.mark.skip_if_no_expiry_policy
+def test_expiry_policy(cache):
+ ttl, num_retries = 0.6, 10
+ cache_eternal = cache.with_expire_policy(create=ExpiryPolicy.ETERNAL)
+ cache_created = cache.with_expire_policy(create=0.6)
+ cache_updated = cache.with_expire_policy(update=0.6)
+ cache_accessed = cache.with_expire_policy(access=0.6)
+
+ for _ in range(num_retries):
+ cache.clear()
+
+ start = time.time()
+
+ cache_eternal.put(0, 0)
+ cache_created.put(1, 1)
+ cache_updated.put(2, 2)
+ cache_accessed.put(3, 3)
+
+ time.sleep(ttl * 2 / 3)
+
+ result = [cache.contains_key(k) for k in range(4)]
+
+ if time.time() - start >= ttl:
+ continue
+
+ assert all(result)
+
+ start = time.time()
+
+ cache_created.put(1, 2) # Check that update doesn't matter for created policy
+ cache_created.get(1) # Check that access doesn't matter for created policy
+ cache_updated.put(2, 3) # Check that update policy works.
+ cache_accessed.get(3) # Check that access policy works.
+
+ time.sleep(ttl * 2 / 3)
+
+ result = [cache.contains_key(k) for k in range(4)]
+
+ if time.time() - start >= ttl:
+ continue
+
+ assert result == [True, False, True, True]
+
+ time.sleep(ttl * 2 / 3)
+
+ cache_updated.get(2) # Check that access doesn't matter for updated policy.
+
+ time.sleep(ttl * 2 / 3)
+
+ result = [cache.contains_key(k) for k in range(0, 4)]
+ assert result == [True, False, False, False]
+
+
+@pytest.mark.asyncio
+@pytest.mark.skip_if_no_expiry_policy
+async def test_expiry_policy_async(async_cache):
+ ttl, num_retries = 0.6, 10
+ cache_eternal = async_cache.with_expire_policy(create=ExpiryPolicy.ETERNAL)
+ cache_created = async_cache.with_expire_policy(create=0.6)
+ cache_updated = async_cache.with_expire_policy(update=0.6)
+ cache_accessed = async_cache.with_expire_policy(access=0.6)
+
+ for _ in range(num_retries):
+ await async_cache.clear()
+
+ start = time.time()
+
+ await asyncio.gather(
+ cache_eternal.put(0, 0),
+ cache_created.put(1, 1),
+ cache_updated.put(2, 2),
+ cache_accessed.put(3, 3)
+ )
+
+ await asyncio.sleep(ttl * 2 / 3)
+
+ result = await asyncio.gather(*[async_cache.contains_key(k) for k in range(4)])
+
+ if time.time() - start >= ttl:
+ continue
+
+ assert all(result)
+
+ start = time.time()
+
+ await asyncio.gather(
+ cache_created.put(1, 2), # Check that update doesn't matter for created policy
+ cache_created.get(1), # Check that access doesn't matter for created policy
+ cache_updated.put(2, 3), # Check that update policy works.
+ cache_accessed.get(3) # Check that access policy works.
+ )
+
+ await asyncio.sleep(ttl * 2 / 3)
+
+ result = await asyncio.gather(*[async_cache.contains_key(k) for k in range(4)])
+
+ if time.time() - start >= ttl:
+ continue
+
+ assert result == [True, False, True, True]
+
+ await asyncio.sleep(ttl * 2 / 3)
+
+ cache_updated.get(2) # Check that access doesn't matter for updated policy.
+
+ await asyncio.sleep(ttl * 2 / 3)
+
+ result = await asyncio.gather(*[async_cache.contains_key(k) for k in range(4)])
+ assert result == [True, False, False, False]
+
+create_cache_with_expiry_params = (
+ 'expiry_policy',
+ [
+ None,
+ ExpiryPolicy(),
+ ExpiryPolicy(create=ExpiryPolicy.ETERNAL),
+ ExpiryPolicy(create=2000, update=4000, access=6000)
+ ]
+)
+
+
+@pytest.mark.parametrize(*create_cache_with_expiry_params)
+@pytest.mark.skip_if_no_expiry_policy
+def test_create_cache_with_expiry_policy(client, expiry_policy):
+ cache = client.create_cache({
+ PROP_NAME: 'expiry_cache',
+ PROP_EXPIRY_POLICY: expiry_policy
+ })
+ try:
+ settings = cache.settings
+ assert settings[PROP_EXPIRY_POLICY] == expiry_policy
+ finally:
+ cache.destroy()
+
+
+@pytest.mark.parametrize(*create_cache_with_expiry_params)
+@pytest.mark.skip_if_no_expiry_policy
+@pytest.mark.asyncio
+async def test_create_cache_with_expiry_policy_async(async_client, expiry_policy):
+ cache = await async_client.create_cache({
+ PROP_NAME: 'expiry_cache',
+ PROP_EXPIRY_POLICY: expiry_policy
+ })
+ try:
+ settings = await cache.settings()
+ assert settings[PROP_EXPIRY_POLICY] == expiry_policy
+ finally:
+ await cache.destroy()
diff --git a/tests/conftest.py b/tests/conftest.py
index 65134fd..1c65356 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -64,6 +64,7 @@ def pytest_addoption(parser):
def pytest_configure(config):
marker_docs = [
"skip_if_no_cext: mark test to run only if c extension is available",
+ "skip_if_no_expiry_policy: mark test to run only if expiry policy is supported by server",
"examples: mark test to run only if --examples are set"
]
diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py
index e82e238..f1ffcfd 100644
--- a/tests/custom/test_cluster.py
+++ b/tests/custom/test_cluster.py
@@ -19,7 +19,7 @@ from pyignite import Client, AioClient
from pyignite.exceptions import CacheError
from tests.util import clear_ignite_work_dir, start_ignite_gen
-from pyignite.datatypes.cluster_state import ClusterState
+from pyignite.datatypes import ClusterState
@pytest.fixture(params=['with-persistence', 'without-persistence'])
@@ -44,6 +44,15 @@ def server2(with_persistence, cleanup):
yield from start_ignite_gen(idx=2, use_persistence=with_persistence)
+@pytest.fixture(autouse=True)
+def cluster_api_supported(request, server1):
+ client = Client()
+ client.connect('127.0.0.1', 10801)
+
+ if not client.protocol_context.is_cluster_api_supported():
+ pytest.skip(f'skipped {request.node.name}, ExpiryPolicy APIis not supported.')
+
+
def test_cluster_set_active(with_persistence):
key = 42
val = 42