You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2021/04/03 07:11:43 UTC
[ignite-python-thin-client] branch master updated: IGNITE-14465 Add
the ability to set and get cluster state
This is an automated email from the ASF dual-hosted git repository.
isapego pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-python-thin-client.git
The following commit(s) were added to refs/heads/master by this push:
new 7c1d0cc IGNITE-14465 Add the ability to set and get cluster state
7c1d0cc is described below
commit 7c1d0cc12fa724989b9bed6e2a14f54c61228d3a
Author: Igor Sapego <is...@apache.org>
AuthorDate: Sat Apr 3 10:10:47 2021 +0300
IGNITE-14465 Add the ability to set and get cluster state
This closes #27
---
pyignite/aio_client.py | 13 +++-
pyignite/aio_cluster.py | 56 ++++++++++++++
pyignite/api/cluster.py | 106 +++++++++++++++++++++++++++
pyignite/client.py | 33 ++++++---
pyignite/cluster.py | 56 ++++++++++++++
pyignite/connection/aio_connection.py | 19 +++--
pyignite/connection/bitmask_feature.py | 57 +++++++++++++++
pyignite/connection/connection.py | 46 +++++-------
pyignite/connection/handshake.py | 41 +++++++----
pyignite/connection/protocol_context.py | 100 +++++++++++++++++++++++++
pyignite/constants.py | 7 +-
pyignite/datatypes/cluster_state.py | 28 +++++++
pyignite/exceptions.py | 18 ++++-
pyignite/queries/op_codes.py | 5 +-
pyignite/queries/query.py | 4 +-
pyignite/queries/response.py | 7 +-
pyignite/stream/aio_cluster.py | 53 ++++++++++++++
tests/config/ignite-config.xml.jinja2 | 5 +-
tests/custom/test_cluster.py | 125 ++++++++++++++++++++++++++++++++
tests/util.py | 20 +++--
20 files changed, 716 insertions(+), 83 deletions(-)
diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py
index 5e64450..1870878 100644
--- a/pyignite/aio_client.py
+++ b/pyignite/aio_client.py
@@ -17,6 +17,7 @@ import random
from itertools import chain
from typing import Iterable, Type, Union, Any, Dict
+from .aio_cluster import AioCluster
from .api import cache_get_node_partitions_async
from .api.binary import get_binary_type_async, put_binary_type_async
from .api.cache_config import cache_get_names_async
@@ -92,7 +93,7 @@ class AioClient(BaseClient):
if not self.partition_aware:
try:
- if self.protocol_version is None:
+ if self.protocol_context is None:
# open connection before adding to the pool
await conn.connect()
@@ -120,7 +121,7 @@ class AioClient(BaseClient):
await asyncio.gather(*reconnect_coro, return_exceptions=True)
- if self.protocol_version is None:
+ if self.protocol_context is None:
raise ReconnectError('Can not connect.')
async def close(self):
@@ -460,3 +461,11 @@ class AioClient(BaseClient):
return AioSqlFieldsCursor(self, c_id, query_str, page_size, query_args, schema, statement_type,
distributed_joins, local, replicated_only, enforce_join_order, collocated,
lazy, include_field_names, max_rows, timeout)
+
+ def get_cluster(self) -> 'AioCluster':
+ """
+ Gets client cluster facade.
+
+ :return: AioClient cluster facade.
+ """
+ return AioCluster(self)
diff --git a/pyignite/aio_cluster.py b/pyignite/aio_cluster.py
new file mode 100644
index 0000000..6d76125
--- /dev/null
+++ b/pyignite/aio_cluster.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains `AioCluster` that lets you get info and change state of the
+whole cluster asynchronously.
+"""
+from pyignite.api.cluster import cluster_get_state_async, cluster_set_state_async
+from pyignite.exceptions import ClusterError
+from pyignite.utils import status_to_exception
+
+
+class AioCluster:
+ """
+ Ignite cluster abstraction. Users should never use this class directly,
+ but construct its instances with
+ :py:meth:`~pyignite.aio_client.AioClient.get_cluster` method instead.
+ """
+
+ def __init__(self, client: 'AioClient'):
+ self._client = client
+
+ @status_to_exception(ClusterError)
+ async def get_state(self):
+ """
+ Gets current cluster state.
+
+ :return: Current cluster state. This is one of ClusterState.INACTIVE,
+ ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
+ """
+ return await cluster_get_state_async(await self._client.random_node())
+
+ @status_to_exception(ClusterError)
+ async def set_state(self, state):
+ """
+ Changes current cluster state to the given.
+
+ Note: Deactivation clears in-memory caches (without persistence)
+ including the system caches.
+
+ :param state: New cluster state. This is one of ClusterState.INACTIVE,
+ ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
+ """
+ return await cluster_set_state_async(await self._client.random_node(), state)
diff --git a/pyignite/api/cluster.py b/pyignite/api/cluster.py
new file mode 100644
index 0000000..e134239
--- /dev/null
+++ b/pyignite/api/cluster.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from pyignite.api import APIResult
+from pyignite.connection import AioConnection, Connection
+from pyignite.datatypes import Byte
+from pyignite.exceptions import NotSupportedByClusterError
+from pyignite.queries import Query, query_perform
+from pyignite.queries.op_codes import OP_CLUSTER_GET_STATE, OP_CLUSTER_CHANGE_STATE
+
+
+def cluster_get_state(connection: 'Connection', query_id=None) -> 'APIResult':
+ """
+ Get cluster state.
+
+ :param connection: Connection to use,
+ :param query_id: (optional) a value generated by client and returned as-is
+ in response.query_id. When the parameter is omitted, a random value
+ is generated,
+ :return: API result data object. Contains zero status and a state
+ retrieved on success, non-zero status and an error description on failure.
+ """
+ return __cluster_get_state(connection, query_id)
+
+
+async def cluster_get_state_async(connection: 'AioConnection', query_id=None) -> 'APIResult':
+ """
+ Async version of cluster_get_state
+ """
+ return await __cluster_get_state(connection, query_id)
+
+
+def __post_process_get_state(result):
+ if result.status == 0:
+ result.value = result.value['state']
+ return result
+
+
+def __cluster_get_state(connection, query_id):
+ if not connection.protocol_context.is_cluster_api_supported():
+ raise NotSupportedByClusterError('Cluster API is not supported by the cluster')
+
+ query_struct = Query(OP_CLUSTER_GET_STATE, query_id=query_id)
+ return query_perform(
+ query_struct, connection,
+ response_config=[('state', Byte)],
+ post_process_fun=__post_process_get_state
+ )
+
+
+def cluster_set_state(connection: 'Connection', state: int, query_id=None) -> 'APIResult':
+ """
+ Set cluster state.
+
+ :param connection: Connection to use,
+ :param state: State to set,
+ :param query_id: (optional) a value generated by client and returned as-is
+ in response.query_id. When the parameter is omitted, a random value
+ is generated,
+ :return: API result data object. Contains zero status if a value
+ is written, non-zero status and an error description otherwise.
+ """
+ return __cluster_set_state(connection, state, query_id)
+
+
+async def cluster_set_state_async(connection: 'AioConnection', state: int, query_id=None) -> 'APIResult':
+ """
+ Async version of cluster_get_state
+ """
+ return await __cluster_set_state(connection, state, query_id)
+
+
+def __post_process_set_state(result):
+ if result.status == 0:
+ result.value = result.value['state']
+ return result
+
+
+def __cluster_set_state(connection, state, query_id):
+ if not connection.protocol_context.is_cluster_api_supported():
+ raise NotSupportedByClusterError('Cluster API is not supported by the cluster')
+
+ query_struct = Query(
+ OP_CLUSTER_CHANGE_STATE,
+ [
+ ('state', Byte)
+ ],
+ query_id=query_id
+ )
+ return query_perform(
+ query_struct, connection,
+ query_params={
+ 'state': state,
+ }
+ )
diff --git a/pyignite/client.py b/pyignite/client.py
index 2f24c43..b7c4046 100644
--- a/pyignite/client.py
+++ b/pyignite/client.py
@@ -49,6 +49,7 @@ from typing import Iterable, Type, Union, Any, Dict
from .api import cache_get_node_partitions
from .api.binary import get_binary_type, put_binary_type
from .api.cache_config import cache_get_names
+from .cluster import Cluster
from .cursors import SqlFieldsCursor
from .cache import Cache, create_cache, get_cache, get_or_create_cache, BaseCache
from .connection import Connection
@@ -83,24 +84,23 @@ class BaseClient:
self._partition_aware = partition_aware
self.affinity_version = (0, 0)
self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)}
- self._protocol_version = None
+ self._protocol_context = None
@property
- def protocol_version(self):
+ def protocol_context(self):
"""
- Returns the tuple of major, minor, and revision numbers of the used
- thin protocol version, or None, if no connection to the Ignite cluster
- was not yet established.
+ Returns protocol context, or None, if no connection to the Ignite
+ cluster was not yet established.
This method is not a part of the public API. Unless you wish to
extend the `pyignite` capabilities (with additional testing, logging,
examining connections, et c.) you probably should not use it.
"""
- return self._protocol_version
+ return self._protocol_context
- @protocol_version.setter
- def protocol_version(self, value):
- self._protocol_version = value
+ @protocol_context.setter
+ def protocol_context(self, value):
+ self._protocol_context = value
@property
def partition_aware(self):
@@ -108,7 +108,8 @@ class BaseClient:
@property
def partition_awareness_supported_by_protocol(self):
- return self.protocol_version is not None and self.protocol_version >= (1, 4, 0)
+ return self.protocol_context is not None \
+ and self.protocol_context.is_partition_awareness_supported()
@property
def compact_footer(self) -> bool:
@@ -379,7 +380,7 @@ class Client(BaseClient):
conn = Connection(self, host, port, **self._connection_args)
try:
- if self.protocol_version is None or self.partition_aware:
+ if self.protocol_context is None or self.partition_aware:
# open connection before adding to the pool
conn.connect()
@@ -396,7 +397,7 @@ class Client(BaseClient):
self._nodes.append(conn)
- if self.protocol_version is None:
+ if self.protocol_context is None:
raise ReconnectError('Can not connect.')
def close(self):
@@ -727,3 +728,11 @@ class Client(BaseClient):
return SqlFieldsCursor(self, c_id, query_str, page_size, query_args, schema, statement_type, distributed_joins,
local, replicated_only, enforce_join_order, collocated, lazy, include_field_names,
max_rows, timeout)
+
+ def get_cluster(self) -> 'Cluster':
+ """
+ Gets client cluster facade.
+
+ :return: Client cluster facade.
+ """
+ return Cluster(self)
diff --git a/pyignite/cluster.py b/pyignite/cluster.py
new file mode 100644
index 0000000..f10afe4
--- /dev/null
+++ b/pyignite/cluster.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains `Cluster` that lets you get info and change state of the
+whole cluster.
+"""
+from pyignite.api.cluster import cluster_get_state, cluster_set_state
+from pyignite.exceptions import ClusterError
+from pyignite.utils import status_to_exception
+
+
+class Cluster:
+ """
+ Ignite cluster abstraction. Users should never use this class directly,
+ but construct its instances with
+ :py:meth:`~pyignite.client.Client.get_cluster` method instead.
+ """
+
+ def __init__(self, client: 'Client'):
+ self._client = client
+
+ @status_to_exception(ClusterError)
+ def get_state(self):
+ """
+ Gets current cluster state.
+
+ :return: Current cluster state. This is one of ClusterState.INACTIVE,
+ ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
+ """
+ return cluster_get_state(self._client.random_node)
+
+ @status_to_exception(ClusterError)
+ def set_state(self, state):
+ """
+ Changes current cluster state to the given.
+
+ Note: Deactivation clears in-memory caches (without persistence)
+ including the system caches.
+
+ :param state: New cluster state. This is one of ClusterState.INACTIVE,
+ ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
+ """
+ return cluster_set_state(self._client.random_node, state)
diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py
index e5c11da..ce32592 100644
--- a/pyignite/connection/aio_connection.py
+++ b/pyignite/connection/aio_connection.py
@@ -36,9 +36,11 @@ from typing import Union
from pyignite.constants import PROTOCOLS, PROTOCOL_BYTE_ORDER
from pyignite.exceptions import HandshakeError, SocketError, connection_errors
+from .bitmask_feature import BitmaskFeature
from .connection import BaseConnection
from .handshake import HandshakeRequest, HandshakeResponse
+from .protocol_context import ProtocolContext
from .ssl import create_ssl_context
from ..stream import AioBinaryStream
@@ -112,27 +114,28 @@ class AioConnection(BaseConnection):
detecting_protocol = False
# choose highest version first
- if self.client.protocol_version is None:
+ if self.client.protocol_context is None:
detecting_protocol = True
- self.client.protocol_version = max(PROTOCOLS)
+ self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported())
try:
result = await self._connect_version()
except HandshakeError as e:
if e.expected_version in PROTOCOLS:
- self.client.protocol_version = e.expected_version
+ self.client.protocol_context.version = e.expected_version
result = await self._connect_version()
else:
raise e
except connection_errors:
# restore undefined protocol version
if detecting_protocol:
- self.client.protocol_version = None
+ self.client.protocol_context = None
raise
# connection is ready for end user
+ features = BitmaskFeature.from_array(result.get('features', None))
+ self.client.protocol_context.features = features
self.uuid = result.get('node_uuid', None) # version-specific (1.4+)
-
self.failed = False
return result
@@ -145,10 +148,10 @@ class AioConnection(BaseConnection):
ssl_context = create_ssl_context(self.ssl_params)
self._reader, self._writer = await asyncio.open_connection(self.host, self.port, ssl=ssl_context)
- protocol_version = self.client.protocol_version
+ protocol_context = self.client.protocol_context
hs_request = HandshakeRequest(
- protocol_version,
+ protocol_context,
self.username,
self.password
)
@@ -158,7 +161,7 @@ class AioConnection(BaseConnection):
await self._send(stream.getbuffer(), reconnect=False)
with AioBinaryStream(self.client, await self._recv(reconnect=False)) as stream:
- hs_response = await HandshakeResponse.parse_async(stream, self.protocol_version)
+ hs_response = await HandshakeResponse.parse_async(stream, self.protocol_context)
if hs_response.op_code == 0:
self._close()
diff --git a/pyignite/connection/bitmask_feature.py b/pyignite/connection/bitmask_feature.py
new file mode 100644
index 0000000..80d51ad
--- /dev/null
+++ b/pyignite/connection/bitmask_feature.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from enum import IntFlag
+from typing import Optional
+
+from pyignite.constants import PROTOCOL_BYTE_ORDER
+
+
+class BitmaskFeature(IntFlag):
+ CLUSTER_API = 1 << 2
+
+ def __bytes__(self) -> bytes:
+ """
+ Convert feature flags array to bytearray bitmask.
+
+ :return: Bitmask as bytearray.
+ """
+ full_bytes = self.bit_length() // 8 + 1
+ return self.to_bytes(full_bytes, byteorder=PROTOCOL_BYTE_ORDER)
+
+ @staticmethod
+ def all_supported() -> 'BitmaskFeature':
+ """
+ Get all supported features.
+
+ :return: All supported features.
+ """
+ supported = BitmaskFeature(0)
+ for feature in BitmaskFeature:
+ supported |= feature
+ return supported
+
+ @staticmethod
+ def from_array(features_array: bytes) -> Optional['BitmaskFeature']:
+ """
+ Get features from bytearray.
+
+ :param features_array: Feature bitmask as array,
+ :return: Return features.
+ """
+ if features_array is None:
+ return None
+ return BitmaskFeature.from_bytes(features_array, byteorder=PROTOCOL_BYTE_ORDER)
diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py
index 901cb56..7d5778c 100644
--- a/pyignite/connection/connection.py
+++ b/pyignite/connection/connection.py
@@ -13,29 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
from collections import OrderedDict
import socket
from typing import Union
from pyignite.constants import PROTOCOLS, IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT, PROTOCOL_BYTE_ORDER
from pyignite.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError
+from .bitmask_feature import BitmaskFeature
from .handshake import HandshakeRequest, HandshakeResponse
+from .protocol_context import ProtocolContext
from .ssl import wrap, check_ssl_params
from ..stream import BinaryStream
@@ -83,19 +70,18 @@ class BaseConnection:
return '{}:{}'.format(self.host or '?', self.port or '?')
@property
- def protocol_version(self):
+ def protocol_context(self):
"""
- Returns the tuple of major, minor, and revision numbers of the used
- thin protocol version, or None, if no connection to the Ignite cluster
- was yet established.
+ Returns protocol context, or None, if no connection to the Ignite
+ cluster was yet established.
"""
- return self.client.protocol_version
+ return self.client.protocol_context
def _process_handshake_error(self, response):
error_text = f'Handshake error: {response.message}'
# if handshake fails for any reason other than protocol mismatch
# (i.e. authentication error), server version is 0.0.0
- protocol_version = self.client.protocol_version
+ protocol_version = self.client.protocol_context.version
server_version = (response.version_major, response.version_minor, response.version_patch)
if any(server_version):
@@ -118,7 +104,7 @@ class Connection(BaseConnection):
* binary protocol connector. Encapsulates handshake and failover reconnection.
"""
- def __init__(self, client: 'Client', host: str, port: int, timeout: float = 2.0,
+ def __init__(self, client: 'Client', host: str, port: int, timeout: float = None,
username: str = None, password: str = None, **ssl_params):
"""
Initialize connection.
@@ -180,25 +166,27 @@ class Connection(BaseConnection):
detecting_protocol = False
# choose highest version first
- if self.client.protocol_version is None:
+ if self.client.protocol_context is None:
detecting_protocol = True
- self.client.protocol_version = max(PROTOCOLS)
+ self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported())
try:
result = self._connect_version()
except HandshakeError as e:
if e.expected_version in PROTOCOLS:
- self.client.protocol_version = e.expected_version
+ self.client.protocol_context.version = e.expected_version
result = self._connect_version()
else:
raise e
except connection_errors:
# restore undefined protocol version
if detecting_protocol:
- self.client.protocol_version = None
+ self.client.protocol_context = None
raise
# connection is ready for end user
+ features = BitmaskFeature.from_array(result.get('features', None))
+ self.client.protocol_context.features = features
self.uuid = result.get('node_uuid', None) # version-specific (1.4+)
self.failed = False
return result
@@ -214,10 +202,10 @@ class Connection(BaseConnection):
self._socket = wrap(self._socket, self.ssl_params)
self._socket.connect((self.host, self.port))
- protocol_version = self.client.protocol_version
+ protocol_context = self.client.protocol_context
hs_request = HandshakeRequest(
- protocol_version,
+ protocol_context,
self.username,
self.password
)
@@ -227,7 +215,7 @@ class Connection(BaseConnection):
self.send(stream.getbuffer(), reconnect=False)
with BinaryStream(self.client, self.recv(reconnect=False)) as stream:
- hs_response = HandshakeResponse.parse(stream, self.protocol_version)
+ hs_response = HandshakeResponse.parse(stream, self.protocol_context)
if hs_response.op_code == 0:
self.close()
diff --git a/pyignite/connection/handshake.py b/pyignite/connection/handshake.py
index 0b0fe50..af7bdb3 100644
--- a/pyignite/connection/handshake.py
+++ b/pyignite/connection/handshake.py
@@ -13,9 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Optional, Tuple
+from typing import Optional
-from pyignite.datatypes import Byte, Int, Short, String, UUIDObject
+from pyignite.connection.protocol_context import ProtocolContext
+from pyignite.datatypes import Byte, Int, Short, String, UUIDObject, ByteArrayObject
from pyignite.datatypes.internal import Struct
from pyignite.stream import READ_BACKWARD
@@ -27,10 +28,10 @@ class HandshakeRequest:
handshake_struct = None
username = None
password = None
- protocol_version = None
+ protocol_context = None
def __init__(
- self, protocol_version: Tuple[int, int, int],
+ self, protocol_context: 'ProtocolContext',
username: Optional[str] = None, password: Optional[str] = None
):
fields = [
@@ -41,7 +42,9 @@ class HandshakeRequest:
('version_patch', Short),
('client_code', Byte),
]
- self.protocol_version = protocol_version
+ self.protocol_context = protocol_context
+ if self.protocol_context.is_feature_flags_supported():
+ fields.append(('features', ByteArrayObject))
if username and password:
self.username = username
self.password = password
@@ -58,14 +61,19 @@ class HandshakeRequest:
await self.handshake_struct.from_python_async(stream, self.__create_handshake_data())
def __create_handshake_data(self):
+ version = self.protocol_context.version
handshake_data = {
'length': 8,
'op_code': OP_HANDSHAKE,
- 'version_major': self.protocol_version[0],
- 'version_minor': self.protocol_version[1],
- 'version_patch': self.protocol_version[2],
+ 'version_major': version[0],
+ 'version_minor': version[1],
+ 'version_patch': version[2],
'client_code': 2, # fixed value defined by protocol
}
+ if self.protocol_context.is_feature_flags_supported():
+ features = bytes(self.protocol_context.features)
+ handshake_data['features'] = features
+ handshake_data['length'] += 5 + len(features)
if self.username and self.password:
handshake_data.update({
'username': self.username,
@@ -96,12 +104,12 @@ class HandshakeResponse(dict):
return self.get(item)
@classmethod
- def parse(cls, stream, protocol_version):
+ def parse(cls, stream, protocol_context):
start_class = cls.__response_start.parse(stream)
start = stream.read_ctype(start_class, direction=READ_BACKWARD)
data = cls.__response_start.to_python(start)
- response_end = cls.__create_response_end(data, protocol_version)
+ response_end = cls.__create_response_end(data, protocol_context)
if response_end:
end_class = response_end.parse(stream)
end = stream.read_ctype(end_class, direction=READ_BACKWARD)
@@ -110,12 +118,12 @@ class HandshakeResponse(dict):
return cls(data)
@classmethod
- async def parse_async(cls, stream, protocol_version):
+ async def parse_async(cls, stream, protocol_context):
start_class = cls.__response_start.parse(stream)
start = stream.read_ctype(start_class, direction=READ_BACKWARD)
data = await cls.__response_start.to_python_async(start)
- response_end = cls.__create_response_end(data, protocol_version)
+ response_end = cls.__create_response_end(data, protocol_context)
if response_end:
end_class = await response_end.parse_async(stream)
end = stream.read_ctype(end_class, direction=READ_BACKWARD)
@@ -124,7 +132,7 @@ class HandshakeResponse(dict):
return cls(data)
@classmethod
- def __create_response_end(cls, start_data, protocol_version):
+ def __create_response_end(cls, start_data, protocol_context):
response_end = None
if start_data['op_code'] == 0:
response_end = Struct([
@@ -134,7 +142,12 @@ class HandshakeResponse(dict):
('message', String),
('client_status', Int)
])
- elif protocol_version >= (1, 4, 0):
+ elif protocol_context.is_feature_flags_supported():
+ response_end = Struct([
+ ('features', ByteArrayObject),
+ ('node_uuid', UUIDObject),
+ ])
+ elif protocol_context.is_partition_awareness_supported():
response_end = Struct([
('node_uuid', UUIDObject),
])
diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py
new file mode 100644
index 0000000..54f5240
--- /dev/null
+++ b/pyignite/connection/protocol_context.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Tuple
+
+from pyignite.connection.bitmask_feature import BitmaskFeature
+
+
+class ProtocolContext:
+ """
+ Protocol context. Provides ability to easily check supported supported
+ protocol features.
+ """
+
+ def __init__(self, version: Tuple[int, int, int], features: BitmaskFeature = None):
+ self._version = version
+ self._features = features
+ self._ensure_consistency()
+
+ def __hash__(self):
+ return hash((self._version, self._features))
+
+ def __eq__(self, other):
+ return isinstance(other, ProtocolContext) and \
+ self.version == other.version and \
+ self.features == other.features
+
+ def _ensure_consistency(self):
+ if not self.is_feature_flags_supported():
+ self._features = None
+
+ @property
+ def version(self):
+ return getattr(self, '_version', None)
+
+ @version.setter
+ def version(self, version: Tuple[int, int, int]):
+ """
+ Set version.
+
+ This call may result in features being reset to None if the protocol
+ version does not support feature masks.
+
+ :param version: Version to set.
+ """
+ setattr(self, '_version', version)
+ self._ensure_consistency()
+
+ @property
+ def features(self):
+ return getattr(self, '_features', None)
+
+ @features.setter
+ def features(self, features: BitmaskFeature):
+ """
+ Try and set new feature set.
+
+ If features are not supported by the protocol, None is set as features
+ instead.
+
+ :param features: Features to set.
+ """
+ setattr(self, '_features', features)
+ self._ensure_consistency()
+
+ def is_partition_awareness_supported(self) -> bool:
+ """
+ Check whether partition awareness supported by the current protocol.
+ """
+ return self.version >= (1, 4, 0)
+
+ def is_status_flags_supported(self) -> bool:
+ """
+ Check whether status flags supported by the current protocol.
+ """
+ return self.version >= (1, 4, 0)
+
+ def is_feature_flags_supported(self) -> bool:
+ """
+ Check whether feature flags supported by the current protocol.
+ """
+ return self.version >= (1, 7, 0)
+
+ def is_cluster_api_supported(self) -> bool:
+ """
+ Check whether cluster API supported by the current protocol.
+ """
+ return self.features and BitmaskFeature.CLUSTER_API in self.features
diff --git a/pyignite/constants.py b/pyignite/constants.py
index 02f7124..c08a3ce 100644
--- a/pyignite/constants.py
+++ b/pyignite/constants.py
@@ -31,14 +31,17 @@ __all__ = [
]
PROTOCOLS = {
+ (1, 7, 0),
+ (1, 6, 0),
+ (1, 5, 0),
(1, 4, 0),
(1, 3, 0),
(1, 2, 0),
}
PROTOCOL_VERSION_MAJOR = 1
-PROTOCOL_VERSION_MINOR = 4
-PROTOCOL_VERSION_PATCH = 0
+PROTOCOL_VERSION_MINOR = 7
+PROTOCOL_VERSION_PATCH = 1
MAX_LONG = 9223372036854775807
MIN_LONG = -9223372036854775808
diff --git a/pyignite/datatypes/cluster_state.py b/pyignite/datatypes/cluster_state.py
new file mode 100644
index 0000000..863a1d2
--- /dev/null
+++ b/pyignite/datatypes/cluster_state.py
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from enum import IntEnum
+
+
+class ClusterState(IntEnum):
+ #: Cluster deactivated. Cache operations aren't allowed.
+ INACTIVE = 0
+
+ #: Cluster activated. All cache operations are allowed.
+ ACTIVE = 1
+
+ #: Cluster activated. Cache read operation allowed, Cache data change operation
+ #: aren't allowed.
+ ACTIVE_READ_ONLY = 2
diff --git a/pyignite/exceptions.py b/pyignite/exceptions.py
index 579aa29..215ccd0 100644
--- a/pyignite/exceptions.py
+++ b/pyignite/exceptions.py
@@ -65,7 +65,7 @@ class ParameterError(Exception):
class CacheError(Exception):
"""
- This exception is raised, whenever any remote Thin client operation
+ This exception is raised, whenever any remote Thin client cache operation
returns an error.
"""
pass
@@ -93,4 +93,20 @@ class SQLError(CacheError):
pass
+class ClusterError(Exception):
+ """
+ This exception is raised, whenever any remote Thin client cluster operation
+ returns an error.
+ """
+ pass
+
+
+class NotSupportedByClusterError(Exception):
+ """
+ This exception is raised, whenever cluster is not supported specific
+ operation probably because it is outdated.
+ """
+ pass
+
+
connection_errors = (IOError, OSError, EOFError)
diff --git a/pyignite/queries/op_codes.py b/pyignite/queries/op_codes.py
index 7372713..c152f7c 100644
--- a/pyignite/queries/op_codes.py
+++ b/pyignite/queries/op_codes.py
@@ -61,7 +61,10 @@ OP_QUERY_SQL_CURSOR_GET_PAGE = 2003
OP_QUERY_SQL_FIELDS = 2004
OP_QUERY_SQL_FIELDS_CURSOR_GET_PAGE = 2005
-P_GET_BINARY_TYPE_NAME = 3000
+OP_GET_BINARY_TYPE_NAME = 3000
OP_REGISTER_BINARY_TYPE_NAME = 3001
OP_GET_BINARY_TYPE = 3002
OP_PUT_BINARY_TYPE = 3003
+
+OP_CLUSTER_GET_STATE = 5000
+OP_CLUSTER_CHANGE_STATE = 5001
diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py
index beea5d9..d9e6aaf 100644
--- a/pyignite/queries/query.py
+++ b/pyignite/queries/query.py
@@ -124,7 +124,7 @@ class Query:
self.from_python(stream, query_params)
response_data = conn.request(stream.getbuffer())
- response_struct = self.response_type(protocol_version=conn.protocol_version,
+ response_struct = self.response_type(protocol_context=conn.protocol_context,
following=response_config, **kwargs)
with BinaryStream(conn.client, response_data) as stream:
@@ -156,7 +156,7 @@ class Query:
await self.from_python_async(stream, query_params)
data = await conn.request(stream.getbuffer())
- response_struct = self.response_type(protocol_version=conn.protocol_version,
+ response_struct = self.response_type(protocol_context=conn.protocol_context,
following=response_config, **kwargs)
with AioBinaryStream(conn.client, data) as stream:
diff --git a/pyignite/queries/response.py b/pyignite/queries/response.py
index 83a6e6a..6495802 100644
--- a/pyignite/queries/response.py
+++ b/pyignite/queries/response.py
@@ -19,6 +19,7 @@ import attr
from collections import OrderedDict
import ctypes
+from pyignite.connection.protocol_context import ProtocolContext
from pyignite.constants import RHF_TOPOLOGY_CHANGED, RHF_ERROR
from pyignite.datatypes import AnyDataObject, Bool, Int, Long, String, StringArray, Struct
from pyignite.datatypes.binary import body_struct, enum_struct, schema_struct
@@ -29,7 +30,7 @@ from pyignite.stream import READ_BACKWARD
@attr.s
class Response:
following = attr.ib(type=list, factory=list)
- protocol_version = attr.ib(type=tuple, factory=tuple)
+ protocol_context = attr.ib(type=type(ProtocolContext), default=None)
_response_header = None
_response_class_name = 'Response'
@@ -44,7 +45,7 @@ class Response:
('query_id', ctypes.c_longlong),
]
- if self.protocol_version and self.protocol_version >= (1, 4, 0):
+ if self.protocol_context.is_status_flags_supported():
fields.append(('flags', ctypes.c_short))
else:
fields.append(('status_code', ctypes.c_int),)
@@ -68,7 +69,7 @@ class Response:
fields = []
has_error = False
- if self.protocol_version and self.protocol_version >= (1, 4, 0):
+ if self.protocol_context.is_status_flags_supported():
if header.flags & RHF_TOPOLOGY_CHANGED:
fields = [
('affinity_version', ctypes.c_longlong),
diff --git a/pyignite/stream/aio_cluster.py b/pyignite/stream/aio_cluster.py
new file mode 100644
index 0000000..8a2f98e
--- /dev/null
+++ b/pyignite/stream/aio_cluster.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains `AioCluster` that lets you get info and change state of the
+whole cluster.
+"""
+from pyignite import AioClient
+from pyignite.api.cluster import cluster_get_state_async, cluster_set_state_async
+
+
+class AioCluster:
+ """
+ Ignite cluster abstraction. Users should never use this class directly,
+ but construct its instances with
+ :py:meth:`~pyignite.aio_client.AioClient.get_cluster` method instead.
+ """
+
+ def __init__(self, client: 'AioClient'):
+ self._client = client
+
+ async def get_state(self):
+ """
+ Gets current cluster state.
+
+ :return: Current cluster state. This is one of ClusterState.INACTIVE,
+ ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
+ """
+ return await cluster_get_state_async(await self._client.random_node())
+
+ async def set_state(self, state):
+ """
+ Changes current cluster state to the given.
+
+ Note: Deactivation clears in-memory caches (without persistence)
+ including the system caches.
+
+ :param state: New cluster state. This is one of ClusterState.INACTIVE,
+ ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
+ """
+ return await cluster_set_state_async(await self._client.random_node(), state)
diff --git a/tests/config/ignite-config.xml.jinja2 b/tests/config/ignite-config.xml.jinja2
index 2bf5129..325a581 100644
--- a/tests/config/ignite-config.xml.jinja2
+++ b/tests/config/ignite-config.xml.jinja2
@@ -31,7 +31,7 @@
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
- {% if use_auth %}
+ {% if use_persistence %}
<property name="persistenceEnabled" value="true"/>
{% endif %}
</bean>
@@ -51,9 +51,8 @@
<property name="authenticationEnabled" value="true"/>
{% endif %}
-
{% if use_ssl %}
- <property name="connectorConfiguration"><null/></property>
+ <property name="connectorConfiguration"><null/></property>
{% endif %}
<property name="clientConnectorConfiguration">
diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py
new file mode 100644
index 0000000..e82e238
--- /dev/null
+++ b/tests/custom/test_cluster.py
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from pyignite import Client, AioClient
+from pyignite.exceptions import CacheError
+from tests.util import clear_ignite_work_dir, start_ignite_gen
+
+from pyignite.datatypes.cluster_state import ClusterState
+
+
+@pytest.fixture(params=['with-persistence', 'without-persistence'])
+def with_persistence(request):
+ yield request.param == 'with-persistence'
+
+
+@pytest.fixture(autouse=True)
+def cleanup():
+ clear_ignite_work_dir()
+ yield None
+ clear_ignite_work_dir()
+
+
+@pytest.fixture(autouse=True)
+def server1(with_persistence, cleanup):
+ yield from start_ignite_gen(idx=1, use_persistence=with_persistence)
+
+
+@pytest.fixture(autouse=True)
+def server2(with_persistence, cleanup):
+ yield from start_ignite_gen(idx=2, use_persistence=with_persistence)
+
+
+def test_cluster_set_active(with_persistence):
+ key = 42
+ val = 42
+ start_state = ClusterState.INACTIVE if with_persistence else ClusterState.ACTIVE
+
+ client = Client()
+ with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]):
+ cluster = client.get_cluster()
+ assert cluster.get_state() == start_state
+
+ cluster.set_state(ClusterState.ACTIVE)
+ assert cluster.get_state() == ClusterState.ACTIVE
+
+ cache = client.get_or_create_cache("test_cache")
+ cache.put(key, val)
+ assert cache.get(key) == val
+
+ cluster.set_state(ClusterState.ACTIVE_READ_ONLY)
+ assert cluster.get_state() == ClusterState.ACTIVE_READ_ONLY
+
+ assert cache.get(key) == val
+ with pytest.raises(CacheError):
+ cache.put(key, val + 1)
+
+ cluster.set_state(ClusterState.INACTIVE)
+ assert cluster.get_state() == ClusterState.INACTIVE
+
+ with pytest.raises(CacheError):
+ cache.get(key)
+
+ with pytest.raises(CacheError):
+ cache.put(key, val + 1)
+
+ cluster.set_state(ClusterState.ACTIVE)
+ assert cluster.get_state() == ClusterState.ACTIVE
+
+ cache.put(key, val + 2)
+ assert cache.get(key) == val + 2
+
+
+@pytest.mark.asyncio
+async def test_cluster_set_active_async(with_persistence):
+ key = 42
+ val = 42
+ start_state = ClusterState.INACTIVE if with_persistence else ClusterState.ACTIVE
+
+ client = AioClient()
+ async with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]):
+ cluster = client.get_cluster()
+ assert await cluster.get_state() == start_state
+
+ await cluster.set_state(ClusterState.ACTIVE)
+ assert await cluster.get_state() == ClusterState.ACTIVE
+
+ cache = await client.get_or_create_cache("test_cache")
+ await cache.put(key, val)
+ assert await cache.get(key) == val
+
+ await cluster.set_state(ClusterState.ACTIVE_READ_ONLY)
+ assert await cluster.get_state() == ClusterState.ACTIVE_READ_ONLY
+
+ assert await cache.get(key) == val
+ with pytest.raises(CacheError):
+ await cache.put(key, val + 1)
+
+ await cluster.set_state(ClusterState.INACTIVE)
+ assert await cluster.get_state() == ClusterState.INACTIVE
+
+ with pytest.raises(CacheError):
+ await cache.get(key)
+
+ with pytest.raises(CacheError):
+ await cache.put(key, val + 1)
+
+ await cluster.set_state(ClusterState.ACTIVE)
+ assert await cluster.get_state() == ClusterState.ACTIVE
+
+ await cache.put(key, val + 2)
+ assert await cache.get(key) == val + 2
diff --git a/tests/util.py b/tests/util.py
index 5651739..af3b70e 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -155,7 +155,7 @@ def create_config_file(tpl_name, file_name, **kwargs):
f.write(template.render(**kwargs))
-def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False):
+def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False, use_persistence=False):
clear_logs(idx)
runner = get_ignite_runner()
@@ -166,8 +166,16 @@ def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False):
env["JVM_OPTS"] = "-Djava.net.preferIPv4Stack=true -Xdebug -Xnoagent -Djava.compiler=NONE " \
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 "
- params = {'ignite_instance_idx': str(idx), 'ignite_client_port': 10800 + idx, 'use_ssl': use_ssl,
- 'use_auth': use_auth}
+ if use_auth:
+ use_persistence = True
+
+ params = {
+ 'ignite_instance_idx': str(idx),
+ 'ignite_client_port': 10800 + idx,
+ 'use_ssl': use_ssl,
+ 'use_auth': use_auth,
+ 'use_persistence': use_persistence,
+ }
create_config_file('log4j.xml.jinja2', f'log4j-{idx}.xml', **params)
create_config_file('ignite-config.xml.jinja2', f'ignite-config-{idx}.xml', **params)
@@ -177,7 +185,7 @@ def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False):
srv = subprocess.Popen(ignite_cmd, env=env, cwd=get_test_dir())
- started = wait_for_condition(lambda: check_server_started(idx), timeout=30)
+ started = wait_for_condition(lambda: check_server_started(idx), timeout=60)
if started:
return srv
@@ -185,8 +193,8 @@ def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False):
raise Exception("Failed to start Ignite: timeout while trying to connect")
-def start_ignite_gen(idx=1, use_ssl=False, use_auth=False):
- srv = start_ignite(idx, use_ssl=use_ssl, use_auth=use_auth)
+def start_ignite_gen(idx=1, use_ssl=False, use_auth=False, use_persistence=False):
+ srv = start_ignite(idx, use_ssl=use_ssl, use_auth=use_auth, use_persistence=use_persistence)
try:
yield srv
finally: