You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2018/10/15 10:29:54 UTC
[3/6] ignite git commit: IGNITE-7782 Python thin client
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e547b13/modules/platforms/python/pyignite/cache.py
----------------------------------------------------------------------
diff --git a/modules/platforms/python/pyignite/cache.py b/modules/platforms/python/pyignite/cache.py
new file mode 100644
index 0000000..6cd7377
--- /dev/null
+++ b/modules/platforms/python/pyignite/cache.py
@@ -0,0 +1,595 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Any, Iterable, Optional, Union
+
+from .datatypes import prop_codes
+from .exceptions import (
+ CacheCreationError, CacheError, ParameterError, SQLError,
+)
+from .utils import cache_id, is_wrapped, status_to_exception, unwrap_binary
+from .api.cache_config import (
+ cache_create, cache_create_with_config,
+ cache_get_or_create, cache_get_or_create_with_config,
+ cache_destroy, cache_get_configuration,
+)
+from .api.key_value import (
+ cache_get, cache_put, cache_get_all, cache_put_all, cache_replace,
+ cache_clear, cache_clear_key, cache_clear_keys,
+ cache_contains_key, cache_contains_keys,
+ cache_get_and_put, cache_get_and_put_if_absent, cache_put_if_absent,
+ cache_get_and_remove, cache_get_and_replace,
+ cache_remove_key, cache_remove_keys, cache_remove_all,
+ cache_remove_if_equals, cache_replace_if_equals, cache_get_size,
+)
+from .api.sql import scan, scan_cursor_get_page, sql, sql_cursor_get_page
+
+
+PROP_CODES = set([
+ getattr(prop_codes, x)
+ for x in dir(prop_codes)
+ if x.startswith('PROP_')
+])
+CACHE_CREATE_FUNCS = {
+ True: {
+ True: cache_get_or_create_with_config,
+ False: cache_create_with_config,
+ },
+ False: {
+ True: cache_get_or_create,
+ False: cache_create,
+ },
+}
+
+
+class Cache:
+ """
+ Ignite cache abstraction. Users should never use this class directly,
+ but construct its instances with
+ :py:meth:`~pyignite.client.Client.create_cache`,
+ :py:meth:`~pyignite.client.Client.get_or_create_cache` or
+ :py:meth:`~pyignite.client.Client.get_cache` methods instead. See
+ :ref:`this example <create_cache>` on how to do it.
+ """
+ _cache_id = None
+ _name = None
+ _client = None
+ _settings = None
+
+ @staticmethod
+ def _validate_settings(
+ settings: Union[str, dict]=None, get_only: bool=False,
+ ):
+ if any([
+ not settings,
+ type(settings) not in (str, dict),
+ type(settings) is dict and prop_codes.PROP_NAME not in settings,
+ ]):
+ raise ParameterError('You should supply at least cache name')
+
+ if all([
+ type(settings) is dict,
+ not set(settings).issubset(PROP_CODES),
+ ]):
+ raise ParameterError('One or more settings was not recognized')
+
+ if get_only and type(settings) is dict and len(settings) != 1:
+ raise ParameterError('Only cache name allowed as a parameter')
+
+ def __init__(
+ self, client: 'Client', settings: Union[str, dict]=None,
+ with_get: bool=False, get_only: bool=False,
+ ):
+ """
+ Initialize cache object.
+
+ :param client: Ignite client,
+ :param settings: cache settings. Can be a string (cache name) or a dict
+ of cache properties and their values. In this case PROP_NAME is
+ mandatory,
+ :param with_get: (optional) do not raise exception, if the cache
+ is already exists. Defaults to False,
+ :param get_only: (optional) do not communicate with Ignite server
+ at all, only create Cache instance. Defaults to False.
+ """
+ self._client = client
+ self._validate_settings(settings)
+ if type(settings) == str:
+ self._name = settings
+ else:
+ self._name = settings[prop_codes.PROP_NAME]
+
+ if not get_only:
+ func = CACHE_CREATE_FUNCS[type(settings) is dict][with_get]
+ result = func(client, settings)
+ if result.status != 0:
+ raise CacheCreationError(result.message)
+
+ self._cache_id = cache_id(self._name)
+
+ @property
+ def settings(self) -> Optional[dict]:
+ """
+ Lazy Cache settings. See the :ref:`example <sql_cache_read>`
+ of reading this property.
+
+ All cache properties are documented here: :ref:`cache_props`.
+
+ :return: dict of cache properties and their values.
+ """
+ if self._settings is None:
+ config_result = cache_get_configuration(self._client, self._cache_id)
+ if config_result.status == 0:
+ self._settings = config_result.value
+ else:
+ raise CacheError(config_result.message)
+
+ return self._settings
+
+ @property
+ def name(self) -> str:
+ """
+ Lazy cache name.
+
+ :return: cache name string.
+ """
+ if self._name is None:
+ self._name = self.settings[prop_codes.PROP_NAME]
+
+ return self._name
+
+ @property
+ def client(self) -> 'Client':
+ """
+ Ignite :class:`~pyignite.client.Client` object.
+
+ :return: Client object, through which the cache is accessed.
+ """
+ return self._client
+
+ @property
+ def cache_id(self) -> int:
+ """
+ Cache ID.
+
+ :return: integer value of the cache ID.
+ """
+ return self._cache_id
+
+ def _process_binary(self, value: Any) -> Any:
+ """
+ Detects and recursively unwraps Binary Object.
+
+ :param value: anything that could be a Binary Object,
+ :return: the result of the Binary Object unwrapping with all other data
+ left intact.
+ """
+ if is_wrapped(value):
+ return unwrap_binary(self._client, value)
+ return value
+
+ @status_to_exception(CacheError)
+ def destroy(self):
+ """
+ Destroys cache with a given name.
+ """
+ return cache_destroy(self._client, self._cache_id)
+
+ @status_to_exception(CacheError)
+ def get(self, key, key_hint: object=None) -> Any:
+ """
+ Retrieves a value from cache by key.
+
+ :param key: key for the cache entry. Can be of any supported type,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ :return: value retrieved.
+ """
+ result = cache_get(self._client, self._cache_id, key, key_hint=key_hint)
+ result.value = self._process_binary(result.value)
+ return result
+
+ @status_to_exception(CacheError)
+ def put(self, key, value, key_hint: object=None, value_hint: object=None):
+ """
+ Puts a value with a given key to cache (overwriting existing value
+ if any).
+
+ :param key: key for the cache entry. Can be of any supported type,
+ :param value: value for the key,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ :param value_hint: (optional) Ignite data type, for which the given
+ value should be converted.
+ """
+ return cache_put(
+ self._client, self._cache_id, key, value,
+ key_hint=key_hint, value_hint=value_hint
+ )
+
+ @status_to_exception(CacheError)
+ def get_all(self, keys: list) -> list:
+ """
+ Retrieves multiple key-value pairs from cache.
+
+ :param keys: list of keys or tuples of (key, key_hint),
+ :return: a dict of key-value pairs.
+ """
+ result = cache_get_all(self._client, self._cache_id, keys)
+ if result.value:
+ for key, value in result.value.items():
+ result.value[key] = self._process_binary(value)
+ return result
+
+ @status_to_exception(CacheError)
+ def put_all(self, pairs: dict):
+ """
+ Puts multiple key-value pairs to cache (overwriting existing
+ associations if any).
+
+ :param pairs: dictionary type parameters, contains key-value pairs
+ to save. Each key or value can be an item of representable
+ Python type or a tuple of (item, hint),
+ """
+ return cache_put_all(self._client, self._cache_id, pairs)
+
+ @status_to_exception(CacheError)
+ def replace(
+ self, key, value, key_hint: object=None, value_hint: object=None
+ ):
+ """
+ Puts a value with a given key to cache only if the key already exist.
+
+ :param key: key for the cache entry. Can be of any supported type,
+ :param value: value for the key,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ :param value_hint: (optional) Ignite data type, for which the given
+ value should be converted.
+ """
+ result = cache_replace(
+ self._client, self._cache_id, key, value,
+ key_hint=key_hint, value_hint=value_hint
+ )
+ result.value = self._process_binary(result.value)
+ return result
+
+ @status_to_exception(CacheError)
+ def clear(self, keys: Optional[list]=None):
+ """
+ Clears the cache without notifying listeners or cache writers.
+
+ :param keys: (optional) list of cache keys or (key, key type
+ hint) tuples to clear (default: clear all).
+ """
+ if keys:
+ return cache_clear_keys(self._client, self._cache_id, keys)
+ else:
+ return cache_clear(self._client, self._cache_id)
+
+ @status_to_exception(CacheError)
+ def clear_key(self, key, key_hint: object=None):
+ """
+ Clears the cache key without notifying listeners or cache writers.
+
+ :param key: key for the cache entry,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ """
+ return cache_clear_key(
+ self._client, self._cache_id, key, key_hint=key_hint
+ )
+
+ @status_to_exception(CacheError)
+ def contains_key(self, key, key_hint=None) -> bool:
+ """
+ Returns a value indicating whether given key is present in cache.
+
+ :param key: key for the cache entry. Can be of any supported type,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ :return: boolean `True` when key is present, `False` otherwise.
+ """
+ return cache_contains_key(
+ self._client, self._cache_id, key, key_hint=key_hint
+ )
+
+ @status_to_exception(CacheError)
+ def contains_keys(self, keys: Iterable) -> bool:
+ """
+ Returns a value indicating whether all given keys are present in cache.
+
+ :param keys: a list of keys or (key, type hint) tuples,
+ :return: boolean `True` when all keys are present, `False` otherwise.
+ """
+ return cache_contains_keys(self._client, self._cache_id, keys)
+
+ @status_to_exception(CacheError)
+ def get_and_put(self, key, value, key_hint=None, value_hint=None) -> Any:
+ """
+ Puts a value with a given key to cache, and returns the previous value
+ for that key, or null value if there was not such key.
+
+ :param key: key for the cache entry. Can be of any supported type,
+ :param value: value for the key,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ :param value_hint: (optional) Ignite data type, for which the given
+ value should be converted.
+ :return: old value or None.
+ """
+ result = cache_get_and_put(
+ self._client, self._cache_id, key, value, key_hint, value_hint
+ )
+ result.value = self._process_binary(result.value)
+ return result
+
+ @status_to_exception(CacheError)
+ def get_and_put_if_absent(
+ self, key, value, key_hint=None, value_hint=None
+ ):
+ """
+ Puts a value with a given key to cache only if the key does not
+ already exist.
+
+ :param key: key for the cache entry. Can be of any supported type,
+ :param value: value for the key,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ :param value_hint: (optional) Ignite data type, for which the given
+ value should be converted,
+ :return: old value or None.
+ """
+ result = cache_get_and_put_if_absent(
+ self._client, self._cache_id, key, value, key_hint, value_hint
+ )
+ result.value = self._process_binary(result.value)
+ return result
+
+ @status_to_exception(CacheError)
+ def put_if_absent(self, key, value, key_hint=None, value_hint=None):
+ """
+ Puts a value with a given key to cache only if the key does not
+ already exist.
+
+ :param key: key for the cache entry. Can be of any supported type,
+ :param value: value for the key,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ :param value_hint: (optional) Ignite data type, for which the given
+ value should be converted.
+ """
+ return cache_put_if_absent(
+ self._client, self._cache_id, key, value, key_hint, value_hint
+ )
+
+ @status_to_exception(CacheError)
+ def get_and_remove(self, key, key_hint=None) -> Any:
+ """
+ Removes the cache entry with specified key, returning the value.
+
+ :param key: key for the cache entry. Can be of any supported type,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ :return: old value or None.
+ """
+ result = cache_get_and_remove(
+ self._client, self._cache_id, key, key_hint
+ )
+ result.value = self._process_binary(result.value)
+ return result
+
+ @status_to_exception(CacheError)
+ def get_and_replace(
+ self, key, value, key_hint=None, value_hint=None
+ ) -> Any:
+ """
+ Puts a value with a given key to cache, returning previous value
+ for that key, if and only if there is a value currently mapped
+ for that key.
+
+ :param key: key for the cache entry. Can be of any supported type,
+ :param value: value for the key,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ :param value_hint: (optional) Ignite data type, for which the given
+ value should be converted.
+ :return: old value or None.
+ """
+ result = cache_get_and_replace(
+ self._client, self._cache_id, key, value, key_hint, value_hint
+ )
+ result.value = self._process_binary(result.value)
+ return result
+
+ @status_to_exception(CacheError)
+ def remove_key(self, key, key_hint=None):
+ """
+ Clears the cache key without notifying listeners or cache writers.
+
+ :param key: key for the cache entry,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ """
+ return cache_remove_key(self._client, self._cache_id, key, key_hint)
+
+ @status_to_exception(CacheError)
+ def remove_keys(self, keys: list):
+ """
+ Removes cache entries by given list of keys, notifying listeners
+ and cache writers.
+
+ :param keys: list of keys or tuples of (key, key_hint) to remove.
+ """
+ return cache_remove_keys(self._client, self._cache_id, keys)
+
+ @status_to_exception(CacheError)
+ def remove_all(self):
+ """
+ Removes all cache entries, notifying listeners and cache writers.
+ """
+ return cache_remove_all(self._client, self._cache_id)
+
+ @status_to_exception(CacheError)
+ def remove_if_equals(self, key, sample, key_hint=None, sample_hint=None):
+ """
+ Removes an entry with a given key if provided value is equal to
+ actual value, notifying listeners and cache writers.
+
+ :param key: key for the cache entry,
+ :param sample: a sample to compare the stored value with,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ :param sample_hint: (optional) Ignite data type, for whic
+ the given sample should be converted.
+ """
+ return cache_remove_if_equals(
+ self._client, self._cache_id, key, sample, key_hint, sample_hint
+ )
+
+ @status_to_exception(CacheError)
+ def replace_if_equals(
+ self, key, sample, value,
+ key_hint=None, sample_hint=None, value_hint=None
+ ) -> Any:
+ """
+ Puts a value with a given key to cache only if the key already exists
+ and value equals provided sample.
+
+ :param key: key for the cache entry,
+ :param sample: a sample to compare the stored value with,
+ :param value: new value for the given key,
+ :param key_hint: (optional) Ignite data type, for which the given key
+ should be converted,
+ :param sample_hint: (optional) Ignite data type, for whic
+ the given sample should be converted
+ :param value_hint: (optional) Ignite data type, for which the given
+ value should be converted,
+ :return: boolean `True` when key is present, `False` otherwise.
+ """
+ result = cache_replace_if_equals(
+ self._client, self._cache_id, key, sample, value,
+ key_hint, sample_hint, value_hint
+ )
+ result.value = self._process_binary(result.value)
+ return result
+
+ @status_to_exception(CacheError)
+ def get_size(self, peek_modes=0):
+ """
+ Gets the number of entries in cache.
+
+ :param peek_modes: (optional) limit count to near cache partition
+ (PeekModes.NEAR), primary cache (PeekModes.PRIMARY), or backup cache
+ (PeekModes.BACKUP). Defaults to all cache partitions (PeekModes.ALL),
+ :return: integer number of cache entries.
+ """
+ return cache_get_size(self._client, self._cache_id, peek_modes)
+
+ def scan(self, page_size: int=1, partitions: int=-1, local: bool=False):
+ """
+ Returns all key-value pairs from the cache, similar to `get_all`, but
+ with internal pagination, which is slower, but safer.
+
+ :param page_size: (optional) page size. Default size is 1 (slowest
+ and safest),
+ :param partitions: (optional) number of partitions to query
+ (negative to query entire cache),
+ :param local: (optional) pass True if this query should be executed
+ on local node only. Defaults to False,
+ :return: generator with key-value pairs.
+ """
+ result = scan(self._client, self._cache_id, page_size, partitions, local)
+ if result.status != 0:
+ raise CacheError(result.message)
+
+ cursor = result.value['cursor']
+ for k, v in result.value['data'].items():
+ k = self._process_binary(k)
+ v = self._process_binary(v)
+ yield k, v
+
+ while result.value['more']:
+ result = scan_cursor_get_page(self._client, cursor)
+ if result.status != 0:
+ raise CacheError(result.message)
+
+ for k, v in result.value['data'].items():
+ k = self._process_binary(k)
+ v = self._process_binary(v)
+ yield k, v
+
+ def select_row(
+ self, query_str: str, page_size: int=1,
+ query_args: Optional[list]=None, distributed_joins: bool=False,
+ replicated_only: bool=False, local: bool=False, timeout: int=0
+ ):
+ """
+ Executes a simplified SQL SELECT query over data stored in the cache.
+ The query returns the whole record (key and value).
+
+ :param query_str: SQL query string,
+ :param page_size: (optional) cursor page size. Default is 1, which
+ means that client makes one server call per row,
+ :param query_args: (optional) query arguments,
+ :param distributed_joins: (optional) distributed joins. Defaults
+ to False,
+ :param replicated_only: (optional) whether query contains only
+ replicated tables or not. Defaults to False,
+ :param local: (optional) pass True if this query should be executed
+ on local node only. Defaults to False,
+ :param timeout: (optional) non-negative timeout value in ms. Zero
+ disables timeout (default),
+ :return: generator with key-value pairs.
+ """
+ def generate_result(value):
+ cursor = value['cursor']
+ more = value['more']
+ for k, v in value['data'].items():
+ k = self._process_binary(k)
+ v = self._process_binary(v)
+ yield k, v
+
+ while more:
+ inner_result = sql_cursor_get_page(self._client, cursor)
+ if result.status != 0:
+ raise SQLError(result.message)
+ more = inner_result.value['more']
+ for k, v in inner_result.value['data'].items():
+ k = self._process_binary(k)
+ v = self._process_binary(v)
+ yield k, v
+
+ type_name = self.settings[
+ prop_codes.PROP_QUERY_ENTITIES
+ ][0]['value_type_name']
+ if not type_name:
+ raise SQLError('Value type is unknown')
+ result = sql(
+ self._client,
+ self._cache_id,
+ type_name,
+ query_str,
+ page_size,
+ query_args,
+ distributed_joins,
+ replicated_only,
+ local,
+ timeout
+ )
+ if result.status != 0:
+ raise SQLError(result.message)
+
+ return generate_result(result.value)
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e547b13/modules/platforms/python/pyignite/client.py
----------------------------------------------------------------------
diff --git a/modules/platforms/python/pyignite/client.py b/modules/platforms/python/pyignite/client.py
new file mode 100644
index 0000000..d5a9464
--- /dev/null
+++ b/modules/platforms/python/pyignite/client.py
@@ -0,0 +1,406 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains `Client` class, that lets you communicate with Apache
+Ignite cluster node by the means of Ignite binary client protocol.
+
+To start the communication, you may connect to the node of their choice
+by instantiating the `Client` object and calling
+:py:meth:`~pyignite.connection.Connection.connect` method with proper
+parameters.
+
+The whole storage room of Ignite cluster is split up into named structures,
+called caches. For accessing the particular cache in key-value style
+(a-la Redis or memcached) you should first create
+the :class:`~pyignite.cache.Cache` object by calling
+:py:meth:`~pyignite.client.Client.create_cache` or
+:py:meth:`~pyignite.client.Client.get_or_create_cache()` methods, than call
+:class:`~pyignite.cache.Cache` methods. If you wish to create a cache object
+without communicating with server, there is also a
+:py:meth:`~pyignite.client.Client.get_cache()` method that does just that.
+
+For using Ignite SQL, call :py:meth:`~pyignite.client.Client.sql` method.
+It returns a generator with result rows.
+
+:py:meth:`~pyignite.client.Client.register_binary_type` and
+:py:meth:`~pyignite.client.Client.query_binary_type` methods operates
+the local (class-wise) registry for Ignite Complex objects.
+"""
+
+from collections import defaultdict, OrderedDict
+from typing import Iterable, Type, Union
+
+from .api.binary import get_binary_type, put_binary_type
+from .api.cache_config import cache_get_names
+from .api.sql import sql_fields, sql_fields_cursor_get_page
+from .cache import Cache
+from .connection import Connection
+from .constants import *
+from .datatypes import BinaryObject
+from .datatypes.internal import tc_map
+from .exceptions import BinaryTypeError, CacheError, SQLError
+from .utils import entity_id, schema_id, status_to_exception
+from .binary import GenericObjectMeta
+
+
+__all__ = ['Client']
+
+
+class Client(Connection):
+ """
+ This is a main `pyignite` class, that is build upon the
+ :class:`~pyignite.connection.Connection`. In addition to the attributes,
+ properties and methods of its parent class, `Client` implements
+ the following features:
+
+ * cache factory. Cache objects are used for key-value operations,
+ * Ignite SQL endpoint,
+ * binary types registration endpoint.
+ """
+
+ _registry = defaultdict(dict)
+ _compact_footer = None
+
+ def _transfer_params(self, to: 'Client'):
+ super()._transfer_params(to)
+ to._registry = self._registry
+ to._compact_footer = self._compact_footer
+
+ def __init__(self, compact_footer: bool=None, *args, **kwargs):
+ """
+ Initialize client.
+
+ :param compact_footer: (optional) use compact (True, recommended) or
+ full (False) schema approach when serializing Complex objects.
+ Default is to use the same approach the server is using (None).
+ Apache Ignite binary protocol documentation on this topic:
+ https://apacheignite.readme.io/docs/binary-client-protocol-data-format#section-schema
+ """
+ self._compact_footer = compact_footer
+ super().__init__(*args, **kwargs)
+
+ @status_to_exception(BinaryTypeError)
+ def get_binary_type(self, binary_type: Union[str, int]) -> dict:
+ """
+ Gets the binary type information from the Ignite server. This is quite
+ a low-level implementation of Ignite thin client protocol's
+ `OP_GET_BINARY_TYPE` operation. You would probably want to use
+ :py:meth:`~pyignite.client.Client.query_binary_type` instead.
+
+ :param binary_type: binary type name or ID,
+ :return: binary type description − a dict with the following fields:
+
+ - `type_exists`: True if the type is registered, False otherwise. In
+ the latter case all the following fields are omitted,
+ - `type_id`: Complex object type ID,
+ - `type_name`: Complex object type name,
+ - `affinity_key_field`: string value or None,
+ - `is_enum`: False in case of Complex object registration,
+ - `schemas`: a list, containing the Complex object schemas in format:
+ OrderedDict[field name: field type hint]. A schema can be empty.
+ """
+ def convert_type(tc_type: int):
+ try:
+ return tc_map(tc_type.to_bytes(1, PROTOCOL_BYTE_ORDER))
+ except (KeyError, OverflowError):
+ # if conversion to char or type lookup failed,
+ # we probably have a binary object type ID
+ return BinaryObject
+
+ def convert_schema(
+ field_ids: list, binary_fields: list
+ ) -> OrderedDict:
+ converted_schema = OrderedDict()
+ for field_id in field_ids:
+ binary_field = [
+ x
+ for x in binary_fields
+ if x['field_id'] == field_id
+ ][0]
+ converted_schema[binary_field['field_name']] = convert_type(
+ binary_field['type_id']
+ )
+ return converted_schema
+
+ result = get_binary_type(self, binary_type)
+ if result.status != 0 or not result.value['type_exists']:
+ return result
+
+ binary_fields = result.value.pop('binary_fields')
+ old_format_schemas = result.value.pop('schema')
+ result.value['schemas'] = []
+ for s_id, field_ids in old_format_schemas.items():
+ result.value['schemas'].append(
+ convert_schema(field_ids, binary_fields)
+ )
+ return result
+
+ @property
+ def compact_footer(self) -> bool:
+ """
+ This property remembers Complex object schema encoding approach when
+ decoding any Complex object, to use the same approach on Complex
+ object encoding.
+
+ :return: True if compact schema was used by server or no Complex
+ object decoding has yet taken place, False if full schema was used.
+ """
+ # this is an ordinary object property, but its backing storage
+ # is a class attribute
+
+ # use compact schema by default, but leave initial (falsy) backing
+ # value unchanged
+ return (
+ self.__class__._compact_footer
+ or self.__class__._compact_footer is None
+ )
+
+ @compact_footer.setter
+ def compact_footer(self, value: bool):
+ # normally schema approach should not change
+ if self.__class__._compact_footer not in (value, None):
+ raise Warning('Can not change client schema approach.')
+ else:
+ self.__class__._compact_footer = value
+
+ @status_to_exception(BinaryTypeError)
+ def put_binary_type(
+ self, type_name: str, affinity_key_field: str=None,
+ is_enum=False, schema: dict=None
+ ):
+ """
+ Registers binary type information in cluster. Do not update binary
+ registry. This is a literal implementation of Ignite thin client
+ protocol's `OP_PUT_BINARY_TYPE` operation. You would probably want
+ to use :py:meth:`~pyignite.client.Client.register_binary_type` instead.
+
+ :param type_name: name of the data type being registered,
+ :param affinity_key_field: (optional) name of the affinity key field,
+ :param is_enum: (optional) register enum if True, binary object
+ otherwise. Defaults to False,
+ :param schema: (optional) when register enum, pass a dict
+ of enumerated parameter names as keys and an integers as values.
+ When register binary type, pass a dict of field names: field types.
+ Binary type with no fields is OK.
+ """
+ return put_binary_type(
+ self, type_name, affinity_key_field, is_enum, schema
+ )
+
+ @staticmethod
+ def _create_dataclass(type_name: str, schema: OrderedDict=None) -> Type:
+ """
+ Creates default (generic) class for Ignite Complex object.
+
+ :param type_name: Complex object type name,
+ :param schema: Complex object schema,
+ :return: the resulting class.
+ """
+ schema = schema or {}
+ return GenericObjectMeta(type_name, (), {}, schema=schema)
+
+ def _sync_binary_registry(self, type_id: int):
+ """
+ Reads Complex object description from Ignite server. Creates default
+ Complex object classes and puts in registry, if not already there.
+
+ :param type_id: Complex object type ID.
+ """
+ type_info = self.get_binary_type(type_id)
+ if type_info['type_exists']:
+ for schema in type_info['schemas']:
+ if not self._registry[type_id].get(schema_id(schema), None):
+ data_class = self._create_dataclass(
+ type_info['type_name'],
+ schema,
+ )
+ self._registry[type_id][schema_id(schema)] = data_class
+
+ def register_binary_type(
+ self, data_class: Type, affinity_key_field: str=None,
+ ):
+ """
+ Register the given class as a representation of a certain Complex
+ object type. Discards autogenerated or previously registered class.
+
+ :param data_class: Complex object class,
+ :param affinity_key_field: (optional) affinity parameter.
+ """
+ if not self.query_binary_type(
+ data_class.type_id, data_class.schema_id
+ ):
+ self.put_binary_type(
+ data_class.type_name,
+ affinity_key_field,
+ schema=data_class.schema,
+ )
+ self._registry[data_class.type_id][data_class.schema_id] = data_class
+
+ def query_binary_type(
+ self, binary_type: Union[int, str], schema: Union[int, dict]=None,
+ sync: bool=True
+ ):
+ """
+ Queries the registry of Complex object classes.
+
+ :param binary_type: Complex object type name or ID,
+ :param schema: (optional) Complex object schema or schema ID,
+ :param sync: (optional) look up the Ignite server for registered
+ Complex objects and create data classes for them if needed,
+ :return: found dataclass or None, if `schema` parameter is provided,
+ a dict of {schema ID: dataclass} format otherwise.
+ """
+ type_id = entity_id(binary_type)
+ s_id = schema_id(schema)
+
+ if schema:
+ try:
+ result = self._registry[type_id][s_id]
+ except KeyError:
+ result = None
+ else:
+ result = self._registry[type_id]
+
+ if sync and not result:
+ self._sync_binary_registry(type_id)
+ return self.query_binary_type(type_id, s_id, sync=False)
+
+ return result
+
+ def create_cache(self, settings: Union[str, dict]) -> 'Cache':
+ """
+ Creates Ignite cache by name. Raises `CacheError` if such a cache is
+ already exists.
+
+ :param settings: cache name or dict of cache properties' codes
+ and values. All cache properties are documented here:
+ :ref:`cache_props`. See also the
+ :ref:`cache creation example <sql_cache_create>`,
+ :return: :class:`~pyignite.cache.Cache` object.
+ """
+ return Cache(self, settings)
+
+ def get_or_create_cache(self, settings: Union[str, dict]) -> 'Cache':
+ """
+ Creates Ignite cache, if not exist.
+
+ :param settings: cache name or dict of cache properties' codes
+ and values. All cache properties are documented here:
+ :ref:`cache_props`. See also the
+ :ref:`cache creation example <sql_cache_create>`,
+ :return: :class:`~pyignite.cache.Cache` object.
+ """
+ return Cache(self, settings, with_get=True)
+
+ def get_cache(self, settings: Union[str, dict]) -> 'Cache':
+ """
+ Creates Cache object with a given cache name without checking it up
+ on server. If such a cache does not exist, some kind of exception
+ (most probably `CacheError`) may be raised later.
+
+ :param settings: cache name or cache properties (but only `PROP_NAME`
+ property is allowed),
+ :return: :class:`~pyignite.cache.Cache` object.
+ """
+ return Cache(self, settings, get_only=True)
+
+ @status_to_exception(CacheError)
+ def get_cache_names(self) -> list:
+ """
+ Gets existing cache names.
+
+ :return: list of cache names.
+ """
+ return cache_get_names(self)
+
+ def sql(
+ self, query_str: str, page_size: int=1, query_args: Iterable=None,
+ schema: Union[int, str]='PUBLIC',
+ statement_type: int=0, distributed_joins: bool=False,
+ local: bool=False, replicated_only: bool=False,
+ enforce_join_order: bool=False, collocated: bool=False,
+ lazy: bool=False, include_field_names: bool=False,
+ max_rows: int=-1, timeout: int=0,
+ ):
+ """
+ Runs an SQL query and returns its result.
+
+ :param query_str: SQL query string,
+ :param page_size: (optional) cursor page size. Default is 1, which
+ means that client makes one server call per row,
+ :param query_args: (optional) query arguments. List of values or
+ (value, type hint) tuples,
+ :param schema: (optional) schema for the query. Defaults to `PUBLIC`,
+ :param statement_type: (optional) statement type. Can be:
+
+ * StatementType.ALL − any type (default),
+ * StatementType.SELECT − select,
+ * StatementType.UPDATE − update.
+
+ :param distributed_joins: (optional) distributed joins. Defaults
+ to False,
+ :param local: (optional) pass True if this query should be executed
+ on local node only. Defaults to False,
+ :param replicated_only: (optional) whether query contains only
+ replicated tables or not. Defaults to False,
+ :param enforce_join_order: (optional) enforce join order. Defaults
+ to False,
+ :param collocated: (optional) whether your data is co-located or not.
+ Defaults to False,
+ :param lazy: (optional) lazy query execution. Defaults to False,
+ :param include_field_names: (optional) include field names in result.
+ Defaults to False,
+ :param max_rows: (optional) query-wide maximum of rows. Defaults to -1
+ (all rows),
+ :param timeout: (optional) non-negative timeout value in ms.
+ Zero disables timeout (default),
+ :return: generator with result rows as a lists. If
+ `include_field_names` was set, the first row will hold field names.
+ """
+ def generate_result(value):
+ cursor = value['cursor']
+ more = value['more']
+
+ if include_field_names:
+ yield value['fields']
+ field_count = len(value['fields'])
+ else:
+ field_count = value['field_count']
+ for line in value['data']:
+ yield line
+
+ while more:
+ inner_result = sql_fields_cursor_get_page(
+ self, cursor, field_count
+ )
+ if inner_result.status != 0:
+ raise SQLError(result.message)
+ more = inner_result.value['more']
+ for line in inner_result.value['data']:
+ yield line
+
+ schema = self.get_or_create_cache(schema)
+ result = sql_fields(
+ self, schema.cache_id, query_str,
+ page_size, query_args, schema.name,
+ statement_type, distributed_joins, local, replicated_only,
+ enforce_join_order, collocated, lazy, include_field_names,
+ max_rows, timeout,
+ )
+ if result.status != 0:
+ raise SQLError(result.message)
+
+ return generate_result(result.value)
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e547b13/modules/platforms/python/pyignite/connection/__init__.py
----------------------------------------------------------------------
diff --git a/modules/platforms/python/pyignite/connection/__init__.py b/modules/platforms/python/pyignite/connection/__init__.py
new file mode 100644
index 0000000..32decdf
--- /dev/null
+++ b/modules/platforms/python/pyignite/connection/__init__.py
@@ -0,0 +1,329 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains `Connection` class, that wraps TCP socket handling,
+as well as Ignite protocol handshaking.
+"""
+
+import socket
+
+from pyignite.constants import *
+from pyignite.exceptions import (
+ HandshakeError, ParameterError, ReconnectError, SocketError,
+)
+
+from pyignite.utils import is_iterable
+from .handshake import HandshakeRequest, read_response
+from .ssl import wrap
+
+
+__all__ = ['Connection']
+
+
+class Connection:
+ """
+ This is a `pyignite` class, that represents a connection to Ignite
+ node. It serves multiple purposes:
+
+ * socket wrapper. Detects fragmentation and network errors. See also
+ https://docs.python.org/3/howto/sockets.html,
+ * binary protocol connector. Incapsulates handshake, data read-ahead and
+ failover reconnection.
+ """
+
+ _socket = None
+ nodes = None
+ host = None
+ port = None
+ timeout = None
+ prefetch = None
+ username = None
+ password = None
+
+ @staticmethod
+ def _check_kwargs(kwargs):
+ expected_args = [
+ 'timeout',
+ 'use_ssl',
+ 'ssl_version',
+ 'ssl_ciphers',
+ 'ssl_cert_reqs',
+ 'ssl_keyfile',
+ 'ssl_certfile',
+ 'ssl_ca_certfile',
+ 'username',
+ 'password',
+ ]
+ for kw in kwargs:
+ if kw not in expected_args:
+ raise ParameterError((
+ 'Unexpected parameter for connection initialization: `{}`'
+ ).format(kw))
+
+ def __init__(self, prefetch: bytes=b'', **kwargs):
+ """
+ Initialize connection.
+
+ For the use of the SSL-related parameters see
+ https://docs.python.org/3/library/ssl.html#ssl-certificates.
+
+ :param prefetch: (optional) initialize the read-ahead data buffer.
+ Empty by default,
+ :param timeout: (optional) sets timeout (in seconds) for each socket
+ operation including `connect`. 0 means non-blocking mode, which is
+ virtually guaranteed to fail. Can accept integer or float value.
+ Default is None (blocking mode),
+ :param use_ssl: (optional) set to True if Ignite server uses SSL
+ on its binary connector. Defaults to use SSL when username
+ and password has been supplied, not to use SSL otherwise,
+ :param ssl_version: (optional) SSL version constant from standard
+ `ssl` module. Defaults to TLS v1.1, as in Ignite 2.5,
+ :param ssl_ciphers: (optional) ciphers to use. If not provided,
+ `ssl` default ciphers are used,
+ :param ssl_cert_reqs: (optional) determines how the remote side
+ certificate is treated:
+
+ * `ssl.CERT_NONE` − remote certificate is ignored (default),
+ * `ssl.CERT_OPTIONAL` − remote certificate will be validated,
+ if provided,
+ * `ssl.CERT_REQUIRED` − valid remote certificate is required,
+
+ :param ssl_keyfile: (optional) a path to SSL key file to identify
+ local (client) party,
+ :param ssl_certfile: (optional) a path to ssl certificate file
+ to identify local (client) party,
+ :param ssl_ca_certfile: (optional) a path to a trusted certificate
+ or a certificate chain. Required to check the validity of the remote
+ (server-side) certificate,
+ :param username: (optional) user name to authenticate to Ignite
+ cluster,
+ :param password: (optional) password to authenticate to Ignite cluster.
+ """
+ self.prefetch = prefetch
+ self._check_kwargs(kwargs)
+ self.timeout = kwargs.pop('timeout', None)
+ self.username = kwargs.pop('username', None)
+ self.password = kwargs.pop('password', None)
+ if all([self.username, self.password, 'use_ssl' not in kwargs]):
+ kwargs['use_ssl'] = True
+ self.init_kwargs = kwargs
+
+ read_response = read_response
+ _wrap = wrap
+
+ @property
+ def socket(self) -> socket.socket:
+ """
+ Network socket.
+ """
+ if self._socket is None:
+ self._reconnect()
+ return self._socket
+
+ def __repr__(self) -> str:
+ if self.host and self.port:
+ return '{}:{}'.format(self.host, self.port)
+ else:
+ return '<not connected>'
+
+ def _connect(self, host: str, port: int):
+ """
+ Actually connect socket.
+ """
+ self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._socket.settimeout(self.timeout)
+ self._socket = self._wrap(self.socket)
+ self._socket.connect((host, port))
+
+ hs_request = HandshakeRequest(self.username, self.password)
+ self.send(hs_request)
+ hs_response = self.read_response()
+ if hs_response['op_code'] == 0:
+ self.close()
+ error_text = 'Handshake error: {}'.format(hs_response['message'])
+ # if handshake fails for any reason other than protocol mismatch
+ # (i.e. authentication error), server version is 0.0.0
+ if any([
+ hs_response['version_major'],
+ hs_response['version_minor'],
+ hs_response['version_patch'],
+ ]):
+ error_text += (
+ ' Server expects binary protocol version '
+ '{version_major}.{version_minor}.{version_patch}. Client '
+ 'provides {client_major}.{client_minor}.{client_patch}.'
+ ).format(
+ client_major=PROTOCOL_VERSION_MAJOR,
+ client_minor=PROTOCOL_VERSION_MINOR,
+ client_patch=PROTOCOL_VERSION_PATCH,
+ **hs_response
+ )
+ raise HandshakeError(error_text)
+ self.host, self.port = host, port
+
+ def connect(self, *args):
+ """
+ Connect to the server. Connection parameters may be either one node
+ (host and port), or list (or other iterable) of nodes.
+
+ :param host: Ignite server host,
+ :param port: Ignite server port,
+ :param nodes: iterable of (host, port) tuples.
+ """
+ self.nodes = iter([])
+ if len(args) == 0:
+ host, port = IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT
+ elif len(args) == 1 and is_iterable(args[0]):
+ self.nodes = iter(args[0])
+ host, port = next(self.nodes)
+ elif (
+ len(args) == 2
+ and isinstance(args[0], str)
+ and isinstance(args[1], int)
+ ):
+ host, port = args
+ else:
+ raise ConnectionError('Connection parameters are not valid.')
+
+ self._connect(host, port)
+
+ def _reconnect(self):
+ """
+ Restore the connection using the next node in `nodes` iterable.
+ """
+ for host, port in self.nodes:
+ try:
+ self._connect(host, port)
+ return
+ except OSError:
+ pass
+ self.host = self.port = self.nodes = None
+ # exception chaining gives a misleading traceback here
+ raise ReconnectError('Can not reconnect: out of nodes') from None
+
+ def _transfer_params(self, to: 'Connection'):
+ """
+ Transfer non-SSL parameters to target connection object.
+
+ :param target: connection object to transfer parameters to.
+ """
+ to.username = self.username
+ to.password = self.password
+ to.nodes = self.nodes
+
+ def clone(self, prefetch: bytes=b'') -> 'Connection':
+ """
+ Clones this connection in its current state.
+
+ :return: `Connection` object.
+ """
+ clone = self.__class__(**self.init_kwargs)
+ self._transfer_params(to=clone)
+ if self.port and self.host:
+ clone._connect(self.host, self.port)
+ clone.prefetch = prefetch
+ return clone
+
+ def send(self, data: bytes, flags=None):
+ """
+ Send data down the socket.
+
+ :param data: bytes to send,
+ :param flags: (optional) OS-specific flags.
+ """
+ kwargs = {}
+ if flags is not None:
+ kwargs['flags'] = flags
+ data = bytes(data)
+ total_bytes_sent = 0
+
+ while total_bytes_sent < len(data):
+ try:
+ bytes_sent = self.socket.send(data[total_bytes_sent:], **kwargs)
+ except OSError:
+ self._socket = self.host = self.port = None
+ raise
+ if bytes_sent == 0:
+ self.socket.close()
+ raise SocketError('Socket connection broken.')
+ total_bytes_sent += bytes_sent
+
+ def recv(self, buffersize, flags=None) -> bytes:
+ """
+ Receive data from socket or read-ahead buffer.
+
+ :param buffersize: bytes to receive,
+ :param flags: (optional) OS-specific flags,
+ :return: data received.
+ """
+ pref_size = len(self.prefetch)
+ if buffersize > pref_size:
+ result = self.prefetch
+ self.prefetch = b''
+ try:
+ result += self._recv(buffersize-pref_size, flags)
+ except (SocketError, OSError):
+ self._socket = self.host = self.port = None
+ raise
+ return result
+ else:
+ result = self.prefetch[:buffersize]
+ self.prefetch = self.prefetch[buffersize:]
+ return result
+
+ def _recv(self, buffersize, flags=None) -> bytes:
+ """
+ Handle socket data reading.
+ """
+ kwargs = {}
+ if flags is not None:
+ kwargs['flags'] = flags
+ chunks = []
+ bytes_rcvd = 0
+
+ while bytes_rcvd < buffersize:
+ chunk = self.socket.recv(buffersize-bytes_rcvd, **kwargs)
+ if chunk == b'':
+ self.socket.close()
+ raise SocketError('Socket connection broken.')
+ chunks.append(chunk)
+ bytes_rcvd += len(chunk)
+
+ return b''.join(chunks)
+
+ def close(self):
+ """
+ Mark socket closed. This is recommended but not required, since
+ sockets are automatically closed when they are garbage-collected.
+ """
+ self._socket.shutdown(socket.SHUT_RDWR)
+ self._socket.close()
+ self._socket = self.host = self.port = None
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e547b13/modules/platforms/python/pyignite/connection/generators.py
----------------------------------------------------------------------
diff --git a/modules/platforms/python/pyignite/connection/generators.py b/modules/platforms/python/pyignite/connection/generators.py
new file mode 100644
index 0000000..d76db0e
--- /dev/null
+++ b/modules/platforms/python/pyignite/connection/generators.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class RoundRobin:
+ """
+ Round-robin generator for use with `Client.connect()`. Cycles a node
+ list until a maximum number of reconnects is reached (if set).
+ """
+
+ def __init__(self, nodes: list, max_reconnects: int=None):
+ """
+ :param nodes: list of two-tuples of (host, port) format,
+ :param max_reconnects: (optional) maximum number of reconnect attempts.
+ defaults to None (cycle nodes infinitely).
+ """
+ self.nodes = nodes
+ self.max_reconnects = max_reconnects
+ self.node_index = 0
+ self.reconnects = 0
+
+ def __iter__(self) -> 'RoundRobin':
+ return self
+
+ def __next__(self) -> tuple:
+ if self.max_reconnects is not None:
+ if self.reconnects >= self.max_reconnects:
+ raise StopIteration
+ else:
+ self.reconnects += 1
+
+ if self.node_index >= len(self.nodes):
+ self.node_index = 0
+ node = self.nodes[self.node_index]
+ self.node_index += 1
+ return node
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e547b13/modules/platforms/python/pyignite/connection/handshake.py
----------------------------------------------------------------------
diff --git a/modules/platforms/python/pyignite/connection/handshake.py b/modules/platforms/python/pyignite/connection/handshake.py
new file mode 100644
index 0000000..13d57fe
--- /dev/null
+++ b/modules/platforms/python/pyignite/connection/handshake.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Optional
+
+from pyignite.constants import *
+from pyignite.datatypes import Byte, Int, Short, String
+from pyignite.datatypes.internal import Struct
+
+OP_HANDSHAKE = 1
+
+
+class HandshakeRequest:
+ """ Handshake request. """
+ handshake_struct = None
+ username = None
+ password = None
+
+ def __init__(
+ self, username: Optional[str]=None, password: Optional[str]=None
+ ):
+ fields = [
+ ('length', Int),
+ ('op_code', Byte),
+ ('version_major', Short),
+ ('version_minor', Short),
+ ('version_patch', Short),
+ ('client_code', Byte),
+ ]
+ if username and password:
+ self.username = username
+ self.password = password
+ fields.extend([
+ ('username', String),
+ ('password', String),
+ ])
+ self.handshake_struct = Struct(fields)
+
+ def __bytes__(self) -> bytes:
+ handshake_data = {
+ 'length': 8,
+ 'op_code': OP_HANDSHAKE,
+ 'version_major': PROTOCOL_VERSION_MAJOR,
+ 'version_minor': PROTOCOL_VERSION_MINOR,
+ 'version_patch': PROTOCOL_VERSION_PATCH,
+ 'client_code': 2, # fixed value defined by protocol
+ }
+ if self.username and self.password:
+ handshake_data.update({
+ 'username': self.username,
+ 'password': self.password,
+ })
+ handshake_data['length'] += sum([
+ 10, # each `String` header takes 5 bytes
+ len(self.username),
+ len(self.password),
+ ])
+ return self.handshake_struct.from_python(handshake_data)
+
+
+def read_response(client):
+ response_start = Struct([
+ ('length', Int),
+ ('op_code', Byte),
+ ])
+ start_class, start_buffer = response_start.parse(client)
+ start = start_class.from_buffer_copy(start_buffer)
+ data = response_start.to_python(start)
+ if data['op_code'] == 0:
+ response_end = Struct([
+ ('version_major', Short),
+ ('version_minor', Short),
+ ('version_patch', Short),
+ ('message', String),
+ ])
+ end_class, end_buffer = response_end.parse(client)
+ end = end_class.from_buffer_copy(end_buffer)
+ data.update(response_end.to_python(end))
+ return data
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e547b13/modules/platforms/python/pyignite/connection/ssl.py
----------------------------------------------------------------------
diff --git a/modules/platforms/python/pyignite/connection/ssl.py b/modules/platforms/python/pyignite/connection/ssl.py
new file mode 100644
index 0000000..548ca7f
--- /dev/null
+++ b/modules/platforms/python/pyignite/connection/ssl.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import ssl
+
+from pyignite.constants import *
+
+
+def wrap(client, _socket):
+ """ Wrap socket in SSL wrapper. """
+ if client.init_kwargs.get('use_ssl', None):
+ _socket = ssl.wrap_socket(
+ _socket,
+ ssl_version=client.init_kwargs.get(
+ 'ssl_version', SSL_DEFAULT_VERSION
+ ),
+ ciphers=client.init_kwargs.get(
+ 'ssl_ciphers', SSL_DEFAULT_CIPHERS
+ ),
+ cert_reqs=client.init_kwargs.get(
+ 'ssl_cert_reqs', ssl.CERT_NONE
+ ),
+ keyfile=client.init_kwargs.get('ssl_keyfile', None),
+ certfile=client.init_kwargs.get('ssl_certfile', None),
+ ca_certs=client.init_kwargs.get('ssl_ca_certfile', None),
+ )
+ return _socket
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e547b13/modules/platforms/python/pyignite/constants.py
----------------------------------------------------------------------
diff --git a/modules/platforms/python/pyignite/constants.py b/modules/platforms/python/pyignite/constants.py
new file mode 100644
index 0000000..78c9379
--- /dev/null
+++ b/modules/platforms/python/pyignite/constants.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains some constants, used internally throughout the API.
+"""
+
+import ssl
+
+
+__all__ = [
+ 'PROTOCOL_VERSION_MAJOR', 'PROTOCOL_VERSION_MINOR',
+ 'PROTOCOL_VERSION_PATCH', 'MAX_LONG', 'MIN_LONG', 'MAX_INT', 'MIN_INT',
+ 'PROTOCOL_BYTE_ORDER', 'PROTOCOL_STRING_ENCODING',
+ 'PROTOCOL_CHAR_ENCODING', 'SSL_DEFAULT_VERSION', 'SSL_DEFAULT_CIPHERS',
+ 'FNV1_OFFSET_BASIS', 'FNV1_PRIME',
+ 'IGNITE_DEFAULT_HOST', 'IGNITE_DEFAULT_PORT',
+]
+
+PROTOCOL_VERSION_MAJOR = 1
+PROTOCOL_VERSION_MINOR = 2
+PROTOCOL_VERSION_PATCH = 0
+
+MAX_LONG = 9223372036854775807
+MIN_LONG = -9223372036854775808
+MAX_INT = 2147483647
+MIN_INT = -2147483648
+
+PROTOCOL_BYTE_ORDER = 'little'
+PROTOCOL_STRING_ENCODING = 'utf-8'
+PROTOCOL_CHAR_ENCODING = 'utf-16le'
+
+SSL_DEFAULT_VERSION = ssl.PROTOCOL_TLSv1_1
+SSL_DEFAULT_CIPHERS = ssl._DEFAULT_CIPHERS
+
+FNV1_OFFSET_BASIS = 0x811c9dc5
+FNV1_PRIME = 0x01000193
+
+IGNITE_DEFAULT_HOST = 'localhost'
+IGNITE_DEFAULT_PORT = 10800
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e547b13/modules/platforms/python/pyignite/datatypes/__init__.py
----------------------------------------------------------------------
diff --git a/modules/platforms/python/pyignite/datatypes/__init__.py b/modules/platforms/python/pyignite/datatypes/__init__.py
new file mode 100644
index 0000000..5024f79
--- /dev/null
+++ b/modules/platforms/python/pyignite/datatypes/__init__.py
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains classes, used internally by `pyignite` for parsing and
+creating binary data.
+"""
+
+from .complex import *
+from .internal import *
+from .null_object import *
+from .primitive import *
+from .primitive_arrays import *
+from .primitive_objects import *
+from .standard import *
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e547b13/modules/platforms/python/pyignite/datatypes/binary.py
----------------------------------------------------------------------
diff --git a/modules/platforms/python/pyignite/datatypes/binary.py b/modules/platforms/python/pyignite/datatypes/binary.py
new file mode 100644
index 0000000..c2344da
--- /dev/null
+++ b/modules/platforms/python/pyignite/datatypes/binary.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pyignite.datatypes import Int, Bool, String, Struct, StructArray
+
+
+binary_fields_struct = StructArray([
+ ('field_name', String),
+ ('type_id', Int),
+ ('field_id', Int),
+])
+
+body_struct = Struct([
+ ('type_id', Int),
+ ('type_name', String),
+ ('affinity_key_field', String),
+ ('binary_fields', binary_fields_struct),
+ ('is_enum', Bool),
+])
+
+enum_struct = StructArray([
+ ('literal', String),
+ ('type_id', Int),
+])
+
+schema_fields_struct = StructArray([
+ ('schema_field_id', Int),
+])
+
+schema_struct = StructArray([
+ ('schema_id', Int),
+ ('schema_fields', schema_fields_struct),
+])
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e547b13/modules/platforms/python/pyignite/datatypes/cache_config.py
----------------------------------------------------------------------
diff --git a/modules/platforms/python/pyignite/datatypes/cache_config.py b/modules/platforms/python/pyignite/datatypes/cache_config.py
new file mode 100644
index 0000000..67b353d
--- /dev/null
+++ b/modules/platforms/python/pyignite/datatypes/cache_config.py
@@ -0,0 +1,153 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .standard import String
+from .internal import AnyDataObject, Struct, StructArray
+from .primitive import *
+
+
+__all__ = [
+ 'cache_config_struct', 'CacheMode', 'PartitionLossPolicy',
+ 'RebalanceMode', 'WriteSynchronizationMode', 'IndexType',
+]
+
+
+class CacheMode(Int):
+ LOCAL = 0
+ REPLICATED = 1
+ PARTITIONED = 2
+
+
+class PartitionLossPolicy(Int):
+ READ_ONLY_SAFE = 0
+ READ_ONLY_ALL = 1
+ READ_WRITE_SAFE = 2
+ READ_WRITE_ALL = 3
+ IGNORE = 4
+
+
+class RebalanceMode(Int):
+ SYNC = 0
+ ASYNC = 1
+ NONE = 2
+
+
+class WriteSynchronizationMode(Int):
+ FULL_SYNC = 0
+ FULL_ASYNC = 1
+ PRIMARY_SYNC = 2
+
+
+class IndexType(Byte):
+ SORTED = 0
+ FULLTEXT = 1
+ GEOSPATIAL = 2
+
+
+class CacheAtomicityMode(Int):
+ TRANSACTIONAL = 0
+ ATOMIC = 1
+
+
+QueryFields = StructArray([
+ ('name', String),
+ ('type_name', String),
+ ('is_key_field', Bool),
+ ('is_notnull_constraint_field', Bool),
+ ('default_value', AnyDataObject),
+ ('precision', Int),
+ ('scale', Int),
+], defaults={
+ 'is_key_field': False,
+ 'is_notnull_constraint_field': False,
+ 'default_value': None,
+ 'precision': -1,
+ 'scale': -1,
+})
+
+
+FieldNameAliases = StructArray([
+ ('field_name', String),
+ ('alias', String),
+])
+
+
+Fields = StructArray([
+ ('name', String),
+ ('is_descending', Bool),
+], defaults={
+ 'is_descending': False,
+})
+
+
+QueryIndexes = StructArray([
+ ('index_name', String),
+ ('index_type', IndexType),
+ ('inline_size', Int),
+ ('fields', Fields),
+])
+
+
+QueryEntities = StructArray([
+ ('key_type_name', String),
+ ('value_type_name', String),
+ ('table_name', String),
+ ('key_field_name', String),
+ ('value_field_name', String),
+ ('query_fields', QueryFields),
+ ('field_name_aliases', FieldNameAliases),
+ ('query_indexes', QueryIndexes),
+])
+
+
+CacheKeyConfiguration = StructArray([
+ ('type_name', String),
+ ('affinity_key_field_name', String),
+])
+
+
+cache_config_struct = Struct([
+ ('length', Int),
+ ('backups_number', Int),
+ ('cache_mode', CacheMode),
+ ('cache_atomicity_mode', CacheAtomicityMode),
+ ('copy_on_read', Bool),
+ ('data_region_name', String),
+ ('eager_ttl', Bool),
+ ('statistics_enabled', Bool),
+ ('group_name', String),
+ ('invalidate', Int),
+ ('default_lock_timeout', Long),
+ ('max_query_iterators', Int),
+ ('name', String),
+ ('is_onheap_cache_enabled', Bool),
+ ('partition_loss_policy', PartitionLossPolicy),
+ ('query_detail_metric_size', Int),
+ ('query_parallelism', Int),
+ ('read_from_backup', Bool),
+ ('rebalance_batch_size', Int),
+ ('rebalance_batches_prefetch_count', Long),
+ ('rebalance_delay', Long),
+ ('rebalance_mode', RebalanceMode),
+ ('rebalance_order', Int),
+ ('rebalance_throttle', Long),
+ ('rebalance_timeout', Long),
+ ('sql_escape_all', Bool),
+ ('sql_index_inline_max_size', Int),
+ ('sql_schema', String),
+ ('write_synchronization_mode', WriteSynchronizationMode),
+ ('cache_key_configuration', CacheKeyConfiguration),
+ ('query_entities', QueryEntities),
+])
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e547b13/modules/platforms/python/pyignite/datatypes/cache_properties.py
----------------------------------------------------------------------
diff --git a/modules/platforms/python/pyignite/datatypes/cache_properties.py b/modules/platforms/python/pyignite/datatypes/cache_properties.py
new file mode 100644
index 0000000..e94db5f
--- /dev/null
+++ b/modules/platforms/python/pyignite/datatypes/cache_properties.py
@@ -0,0 +1,287 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import ctypes
+
+from .prop_codes import *
+from .cache_config import (
+ CacheMode, CacheAtomicityMode, PartitionLossPolicy, RebalanceMode,
+ WriteSynchronizationMode, QueryEntities, CacheKeyConfiguration,
+)
+from .primitive import *
+from .standard import *
+
+
+__all__ = [
+ 'PropName', 'PropCacheMode', 'PropCacheAtomicityMode', 'PropBackupsNumber',
+ 'PropWriteSynchronizationMode', 'PropCopyOnRead', 'PropReadFromBackup',
+ 'PropDataRegionName', 'PropIsOnheapCacheEnabled', 'PropQueryEntities',
+ 'PropQueryParallelism', 'PropQueryDetailMetricSize', 'PropSQLSchema',
+ 'PropSQLIndexInlineMaxSize', 'PropSqlEscapeAll', 'PropMaxQueryIterators',
+ 'PropRebalanceMode', 'PropRebalanceDelay', 'PropRebalanceTimeout',
+ 'PropRebalanceBatchSize', 'PropRebalanceBatchesPrefetchCount',
+ 'PropRebalanceOrder', 'PropRebalanceThrottle', 'PropGroupName',
+ 'PropCacheKeyConfiguration', 'PropDefaultLockTimeout',
+ 'PropMaxConcurrentAsyncOperation', 'PropPartitionLossPolicy',
+ 'PropEagerTTL', 'PropStatisticsEnabled', 'prop_map', 'AnyProperty',
+]
+
+
+def prop_map(code: int):
+ return {
+ PROP_NAME: PropName,
+ PROP_CACHE_MODE: PropCacheMode,
+ PROP_CACHE_ATOMICITY_MODE: PropCacheAtomicityMode,
+ PROP_BACKUPS_NUMBER: PropBackupsNumber,
+ PROP_WRITE_SYNCHRONIZATION_MODE: PropWriteSynchronizationMode,
+ PROP_COPY_ON_READ: PropCopyOnRead,
+ PROP_READ_FROM_BACKUP: PropReadFromBackup,
+ PROP_DATA_REGION_NAME: PropDataRegionName,
+ PROP_IS_ONHEAP_CACHE_ENABLED: PropIsOnheapCacheEnabled,
+ PROP_QUERY_ENTITIES: PropQueryEntities,
+ PROP_QUERY_PARALLELISM: PropQueryParallelism,
+ PROP_QUERY_DETAIL_METRIC_SIZE: PropQueryDetailMetricSize,
+ PROP_SQL_SCHEMA: PropSQLSchema,
+ PROP_SQL_INDEX_INLINE_MAX_SIZE: PropSQLIndexInlineMaxSize,
+ PROP_SQL_ESCAPE_ALL: PropSqlEscapeAll,
+ PROP_MAX_QUERY_ITERATORS: PropMaxQueryIterators,
+ PROP_REBALANCE_MODE: PropRebalanceMode,
+ PROP_REBALANCE_DELAY: PropRebalanceDelay,
+ PROP_REBALANCE_TIMEOUT: PropRebalanceTimeout,
+ PROP_REBALANCE_BATCH_SIZE: PropRebalanceBatchSize,
+ PROP_REBALANCE_BATCHES_PREFETCH_COUNT: PropRebalanceBatchesPrefetchCount,
+ PROP_REBALANCE_ORDER: PropRebalanceOrder,
+ PROP_REBALANCE_THROTTLE: PropRebalanceThrottle,
+ PROP_GROUP_NAME: PropGroupName,
+ PROP_CACHE_KEY_CONFIGURATION: PropCacheKeyConfiguration,
+ PROP_DEFAULT_LOCK_TIMEOUT: PropDefaultLockTimeout,
+ PROP_MAX_CONCURRENT_ASYNC_OPERATIONS: PropMaxConcurrentAsyncOperation,
+ PROP_PARTITION_LOSS_POLICY: PartitionLossPolicy,
+ PROP_EAGER_TTL: PropEagerTTL,
+ PROP_STATISTICS_ENABLED: PropStatisticsEnabled,
+ }[code]
+
+
+class PropBase:
+ prop_code = None
+ prop_data_class = None
+
+ @classmethod
+ def build_header(cls):
+ return type(
+ cls.__name__+'Header',
+ (ctypes.LittleEndianStructure,),
+ {
+ '_pack_': 1,
+ '_fields_': [
+ ('prop_code', ctypes.c_short),
+ ],
+ }
+ )
+
+ @classmethod
+ def parse(cls, connection: 'Connection'):
+ header_class = cls.build_header()
+ header_buffer = connection.recv(ctypes.sizeof(header_class))
+ data_class, data_buffer = cls.prop_data_class.parse(connection)
+ prop_class = type(
+ cls.__name__,
+ (header_class,),
+ {
+ '_pack_': 1,
+ '_fields_': [
+ ('data', data_class),
+ ],
+ }
+ )
+ return prop_class, header_buffer + data_buffer
+
+ @classmethod
+ def to_python(cls, ctype_object, *args, **kwargs):
+ return cls.prop_data_class.to_python(
+ ctype_object.data, *args, **kwargs
+ )
+
+ @classmethod
+ def from_python(cls, value):
+ header_class = cls.build_header()
+ header = header_class()
+ header.prop_code = cls.prop_code
+ return bytes(header) + cls.prop_data_class.from_python(value)
+
+
+class PropName(PropBase):
+ prop_code = PROP_NAME
+ prop_data_class = String
+
+
+class PropCacheMode(PropBase):
+ prop_code = PROP_CACHE_MODE
+ prop_data_class = CacheMode
+
+
+class PropCacheAtomicityMode(PropBase):
+ prop_code = PROP_CACHE_ATOMICITY_MODE
+ prop_data_class = CacheAtomicityMode
+
+
+class PropBackupsNumber(PropBase):
+ prop_code = PROP_BACKUPS_NUMBER
+ prop_data_class = Int
+
+
+class PropWriteSynchronizationMode(PropBase):
+ prop_code = PROP_WRITE_SYNCHRONIZATION_MODE
+ prop_data_class = WriteSynchronizationMode
+
+
+class PropCopyOnRead(PropBase):
+ prop_code = PROP_COPY_ON_READ
+ prop_data_class = Bool
+
+
+class PropReadFromBackup(PropBase):
+ prop_code = PROP_READ_FROM_BACKUP
+ prop_data_class = Bool
+
+
+class PropDataRegionName(PropBase):
+ prop_code = PROP_DATA_REGION_NAME
+ prop_data_class = String
+
+
+class PropIsOnheapCacheEnabled(PropBase):
+ prop_code = PROP_IS_ONHEAP_CACHE_ENABLED
+ prop_data_class = Bool
+
+
+class PropQueryEntities(PropBase):
+ prop_code = PROP_QUERY_ENTITIES
+ prop_data_class = QueryEntities
+
+
+class PropQueryParallelism(PropBase):
+ prop_code = PROP_QUERY_PARALLELISM
+ prop_data_class = Int
+
+
+class PropQueryDetailMetricSize(PropBase):
+ prop_code = PROP_QUERY_DETAIL_METRIC_SIZE
+ prop_data_class = Int
+
+
+class PropSQLSchema(PropBase):
+ prop_code = PROP_SQL_SCHEMA
+ prop_data_class = String
+
+
+class PropSQLIndexInlineMaxSize(PropBase):
+ prop_code = PROP_SQL_INDEX_INLINE_MAX_SIZE
+ prop_data_class = Int
+
+
+class PropSqlEscapeAll(PropBase):
+ prop_code = PROP_SQL_ESCAPE_ALL
+ prop_data_class = Bool
+
+
+class PropMaxQueryIterators(PropBase):
+ prop_code = PROP_MAX_QUERY_ITERATORS
+ prop_data_class = Int
+
+
+class PropRebalanceMode(PropBase):
+ prop_code = PROP_REBALANCE_MODE
+ prop_data_class = RebalanceMode
+
+
+class PropRebalanceDelay(PropBase):
+ prop_code = PROP_REBALANCE_DELAY
+ prop_data_class = Long
+
+
+class PropRebalanceTimeout(PropBase):
+ prop_code = PROP_REBALANCE_TIMEOUT
+ prop_data_class = Long
+
+
+class PropRebalanceBatchSize(PropBase):
+ prop_code = PROP_REBALANCE_BATCH_SIZE
+ prop_data_class = Int
+
+
+class PropRebalanceBatchesPrefetchCount(PropBase):
+ prop_code = PROP_REBALANCE_BATCHES_PREFETCH_COUNT
+ prop_data_class = Long
+
+
+class PropRebalanceOrder(PropBase):
+ prop_code = PROP_REBALANCE_ORDER
+ prop_data_class = Int
+
+
+class PropRebalanceThrottle(PropBase):
+ prop_code = PROP_REBALANCE_THROTTLE
+ prop_data_class = Long
+
+
+class PropGroupName(PropBase):
+ prop_code = PROP_GROUP_NAME
+ prop_data_class = String
+
+
+class PropCacheKeyConfiguration(PropBase):
+ prop_code = PROP_CACHE_KEY_CONFIGURATION
+ prop_data_class = CacheKeyConfiguration
+
+
+class PropDefaultLockTimeout(PropBase):
+ prop_code = PROP_DEFAULT_LOCK_TIMEOUT
+ prop_data_class = Long
+
+
+class PropMaxConcurrentAsyncOperation(PropBase):
+ prop_code = PROP_MAX_CONCURRENT_ASYNC_OPERATIONS
+ prop_data_class = Int
+
+
+class PropPartitionLossPolicy(PropBase):
+ prop_code = PROP_PARTITION_LOSS_POLICY
+ prop_data_class = PartitionLossPolicy
+
+
+class PropEagerTTL(PropBase):
+ prop_code = PROP_EAGER_TTL
+ prop_data_class = Bool
+
+
+class PropStatisticsEnabled(PropBase):
+ prop_code = PROP_STATISTICS_ENABLED
+ prop_data_class = Bool
+
+
+class AnyProperty(PropBase):
+
+ @classmethod
+ def from_python(cls, value):
+ raise Exception(
+ 'You must choose a certain type '
+ 'for your cache configuration property'
+ )
+
+ @classmethod
+ def to_python(cls, ctype_object, *args, **kwargs):
+ prop_data_class = prop_map(ctype_object.prop_code)
+ return prop_data_class.to_python(ctype_object.data, *args, **kwargs)