You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2021/04/03 07:11:43 UTC

[ignite-python-thin-client] branch master updated: IGNITE-14465 Add the ability to set and get cluster state

This is an automated email from the ASF dual-hosted git repository.

isapego pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-python-thin-client.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c1d0cc  IGNITE-14465 Add the ability to set and get cluster state
7c1d0cc is described below

commit 7c1d0cc12fa724989b9bed6e2a14f54c61228d3a
Author: Igor Sapego <is...@apache.org>
AuthorDate: Sat Apr 3 10:10:47 2021 +0300

    IGNITE-14465 Add the ability to set and get cluster state
    
    This closes #27
---
 pyignite/aio_client.py                  |  13 +++-
 pyignite/aio_cluster.py                 |  56 ++++++++++++++
 pyignite/api/cluster.py                 | 106 +++++++++++++++++++++++++++
 pyignite/client.py                      |  33 ++++++---
 pyignite/cluster.py                     |  56 ++++++++++++++
 pyignite/connection/aio_connection.py   |  19 +++--
 pyignite/connection/bitmask_feature.py  |  57 +++++++++++++++
 pyignite/connection/connection.py       |  46 +++++-------
 pyignite/connection/handshake.py        |  41 +++++++----
 pyignite/connection/protocol_context.py | 100 +++++++++++++++++++++++++
 pyignite/constants.py                   |   7 +-
 pyignite/datatypes/cluster_state.py     |  28 +++++++
 pyignite/exceptions.py                  |  18 ++++-
 pyignite/queries/op_codes.py            |   5 +-
 pyignite/queries/query.py               |   4 +-
 pyignite/queries/response.py            |   7 +-
 pyignite/stream/aio_cluster.py          |  53 ++++++++++++++
 tests/config/ignite-config.xml.jinja2   |   5 +-
 tests/custom/test_cluster.py            | 125 ++++++++++++++++++++++++++++++++
 tests/util.py                           |  20 +++--
 20 files changed, 716 insertions(+), 83 deletions(-)

diff --git a/pyignite/aio_client.py b/pyignite/aio_client.py
index 5e64450..1870878 100644
--- a/pyignite/aio_client.py
+++ b/pyignite/aio_client.py
@@ -17,6 +17,7 @@ import random
 from itertools import chain
 from typing import Iterable, Type, Union, Any, Dict
 
+from .aio_cluster import AioCluster
 from .api import cache_get_node_partitions_async
 from .api.binary import get_binary_type_async, put_binary_type_async
 from .api.cache_config import cache_get_names_async
@@ -92,7 +93,7 @@ class AioClient(BaseClient):
 
             if not self.partition_aware:
                 try:
-                    if self.protocol_version is None:
+                    if self.protocol_context is None:
                         # open connection before adding to the pool
                         await conn.connect()
 
@@ -120,7 +121,7 @@ class AioClient(BaseClient):
 
             await asyncio.gather(*reconnect_coro, return_exceptions=True)
 
-        if self.protocol_version is None:
+        if self.protocol_context is None:
             raise ReconnectError('Can not connect.')
 
     async def close(self):
@@ -460,3 +461,11 @@ class AioClient(BaseClient):
         return AioSqlFieldsCursor(self, c_id, query_str, page_size, query_args, schema, statement_type,
                                   distributed_joins, local, replicated_only, enforce_join_order, collocated,
                                   lazy, include_field_names, max_rows, timeout)
+
+    def get_cluster(self) -> 'AioCluster':
+        """
+        Gets client cluster facade.
+
+        :return: AioClient cluster facade.
+        """
+        return AioCluster(self)
diff --git a/pyignite/aio_cluster.py b/pyignite/aio_cluster.py
new file mode 100644
index 0000000..6d76125
--- /dev/null
+++ b/pyignite/aio_cluster.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains `AioCluster` that lets you get info and change state of the
+whole cluster asynchronously.
+"""
+from pyignite.api.cluster import cluster_get_state_async, cluster_set_state_async
+from pyignite.exceptions import ClusterError
+from pyignite.utils import status_to_exception
+
+
+class AioCluster:
+    """
+    Ignite cluster abstraction. Users should never use this class directly,
+    but construct its instances with
+    :py:meth:`~pyignite.aio_client.AioClient.get_cluster` method instead.
+    """
+
+    def __init__(self, client: 'AioClient'):
+        self._client = client
+
+    @status_to_exception(ClusterError)
+    async def get_state(self):
+        """
+        Gets current cluster state.
+
+        :return: Current cluster state. This is one of ClusterState.INACTIVE,
+         ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
+        """
+        return await cluster_get_state_async(await self._client.random_node())
+
+    @status_to_exception(ClusterError)
+    async def set_state(self, state):
+        """
+        Changes current cluster state to the given.
+
+        Note: Deactivation clears in-memory caches (without persistence)
+         including the system caches.
+
+        :param state: New cluster state. This is one of ClusterState.INACTIVE,
+         ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
+        """
+        return await cluster_set_state_async(await self._client.random_node(), state)
diff --git a/pyignite/api/cluster.py b/pyignite/api/cluster.py
new file mode 100644
index 0000000..e134239
--- /dev/null
+++ b/pyignite/api/cluster.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from pyignite.api import APIResult
+from pyignite.connection import AioConnection, Connection
+from pyignite.datatypes import Byte
+from pyignite.exceptions import NotSupportedByClusterError
+from pyignite.queries import Query, query_perform
+from pyignite.queries.op_codes import OP_CLUSTER_GET_STATE, OP_CLUSTER_CHANGE_STATE
+
+
+def cluster_get_state(connection: 'Connection', query_id=None) -> 'APIResult':
+    """
+    Get cluster state.
+
+    :param connection: Connection to use,
+    :param query_id: (optional) a value generated by client and returned as-is
+     in response.query_id. When the parameter is omitted, a random value
+     is generated,
+    :return: API result data object. Contains zero status and a state
+     retrieved on success, non-zero status and an error description on failure.
+    """
+    return __cluster_get_state(connection, query_id)
+
+
+async def cluster_get_state_async(connection: 'AioConnection', query_id=None) -> 'APIResult':
+    """
+    Async version of cluster_get_state
+    """
+    return await __cluster_get_state(connection, query_id)
+
+
+def __post_process_get_state(result):
+    if result.status == 0:
+        result.value = result.value['state']
+    return result
+
+
+def __cluster_get_state(connection, query_id):
+    if not connection.protocol_context.is_cluster_api_supported():
+        raise NotSupportedByClusterError('Cluster API is not supported by the cluster')
+
+    query_struct = Query(OP_CLUSTER_GET_STATE, query_id=query_id)
+    return query_perform(
+        query_struct, connection,
+        response_config=[('state', Byte)],
+        post_process_fun=__post_process_get_state
+    )
+
+
+def cluster_set_state(connection: 'Connection', state: int, query_id=None) -> 'APIResult':
+    """
+    Set cluster state.
+
+    :param connection: Connection to use,
+    :param state: State to set,
+    :param query_id: (optional) a value generated by client and returned as-is
+     in response.query_id. When the parameter is omitted, a random value
+     is generated,
+    :return: API result data object. Contains zero status if a value
+     is written, non-zero status and an error description otherwise.
+    """
+    return __cluster_set_state(connection, state, query_id)
+
+
+async def cluster_set_state_async(connection: 'AioConnection', state: int, query_id=None) -> 'APIResult':
+    """
+    Async version of cluster_get_state
+    """
+    return await __cluster_set_state(connection, state, query_id)
+
+
+def __post_process_set_state(result):
+    if result.status == 0:
+        result.value = result.value['state']
+    return result
+
+
+def __cluster_set_state(connection, state, query_id):
+    if not connection.protocol_context.is_cluster_api_supported():
+        raise NotSupportedByClusterError('Cluster API is not supported by the cluster')
+
+    query_struct = Query(
+        OP_CLUSTER_CHANGE_STATE,
+        [
+            ('state', Byte)
+        ],
+        query_id=query_id
+    )
+    return query_perform(
+        query_struct, connection,
+        query_params={
+            'state': state,
+        }
+    )
diff --git a/pyignite/client.py b/pyignite/client.py
index 2f24c43..b7c4046 100644
--- a/pyignite/client.py
+++ b/pyignite/client.py
@@ -49,6 +49,7 @@ from typing import Iterable, Type, Union, Any, Dict
 from .api import cache_get_node_partitions
 from .api.binary import get_binary_type, put_binary_type
 from .api.cache_config import cache_get_names
+from .cluster import Cluster
 from .cursors import SqlFieldsCursor
 from .cache import Cache, create_cache, get_cache, get_or_create_cache, BaseCache
 from .connection import Connection
@@ -83,24 +84,23 @@ class BaseClient:
         self._partition_aware = partition_aware
         self.affinity_version = (0, 0)
         self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)}
-        self._protocol_version = None
+        self._protocol_context = None
 
     @property
-    def protocol_version(self):
+    def protocol_context(self):
         """
-        Returns the tuple of major, minor, and revision numbers of the used
-        thin protocol version, or None, if no connection to the Ignite cluster
-        was not yet established.
+        Returns protocol context, or None, if no connection to the Ignite
+        cluster was not yet established.
 
         This method is not a part of the public API. Unless you wish to
         extend the `pyignite` capabilities (with additional testing, logging,
         examining connections, et c.) you probably should not use it.
         """
-        return self._protocol_version
+        return self._protocol_context
 
-    @protocol_version.setter
-    def protocol_version(self, value):
-        self._protocol_version = value
+    @protocol_context.setter
+    def protocol_context(self, value):
+        self._protocol_context = value
 
     @property
     def partition_aware(self):
@@ -108,7 +108,8 @@ class BaseClient:
 
     @property
     def partition_awareness_supported_by_protocol(self):
-        return self.protocol_version is not None and self.protocol_version >= (1, 4, 0)
+        return self.protocol_context is not None \
+            and self.protocol_context.is_partition_awareness_supported()
 
     @property
     def compact_footer(self) -> bool:
@@ -379,7 +380,7 @@ class Client(BaseClient):
             conn = Connection(self, host, port, **self._connection_args)
 
             try:
-                if self.protocol_version is None or self.partition_aware:
+                if self.protocol_context is None or self.partition_aware:
                     # open connection before adding to the pool
                     conn.connect()
 
@@ -396,7 +397,7 @@ class Client(BaseClient):
 
             self._nodes.append(conn)
 
-        if self.protocol_version is None:
+        if self.protocol_context is None:
             raise ReconnectError('Can not connect.')
 
     def close(self):
@@ -727,3 +728,11 @@ class Client(BaseClient):
         return SqlFieldsCursor(self, c_id, query_str, page_size, query_args, schema, statement_type, distributed_joins,
                                local, replicated_only, enforce_join_order, collocated, lazy, include_field_names,
                                max_rows, timeout)
+
+    def get_cluster(self) -> 'Cluster':
+        """
+        Gets client cluster facade.
+
+        :return: Client cluster facade.
+        """
+        return Cluster(self)
diff --git a/pyignite/cluster.py b/pyignite/cluster.py
new file mode 100644
index 0000000..f10afe4
--- /dev/null
+++ b/pyignite/cluster.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains `Cluster` that lets you get info and change state of the
+whole cluster.
+"""
+from pyignite.api.cluster import cluster_get_state, cluster_set_state
+from pyignite.exceptions import ClusterError
+from pyignite.utils import status_to_exception
+
+
+class Cluster:
+    """
+    Ignite cluster abstraction. Users should never use this class directly,
+    but construct its instances with
+    :py:meth:`~pyignite.client.Client.get_cluster` method instead.
+    """
+
+    def __init__(self, client: 'Client'):
+        self._client = client
+
+    @status_to_exception(ClusterError)
+    def get_state(self):
+        """
+        Gets current cluster state.
+
+        :return: Current cluster state. This is one of ClusterState.INACTIVE,
+         ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
+        """
+        return cluster_get_state(self._client.random_node)
+
+    @status_to_exception(ClusterError)
+    def set_state(self, state):
+        """
+        Changes current cluster state to the given.
+
+        Note: Deactivation clears in-memory caches (without persistence)
+         including the system caches.
+
+        :param state: New cluster state. This is one of ClusterState.INACTIVE,
+         ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
+        """
+        return cluster_set_state(self._client.random_node, state)
diff --git a/pyignite/connection/aio_connection.py b/pyignite/connection/aio_connection.py
index e5c11da..ce32592 100644
--- a/pyignite/connection/aio_connection.py
+++ b/pyignite/connection/aio_connection.py
@@ -36,9 +36,11 @@ from typing import Union
 
 from pyignite.constants import PROTOCOLS, PROTOCOL_BYTE_ORDER
 from pyignite.exceptions import HandshakeError, SocketError, connection_errors
+from .bitmask_feature import BitmaskFeature
 from .connection import BaseConnection
 
 from .handshake import HandshakeRequest, HandshakeResponse
+from .protocol_context import ProtocolContext
 from .ssl import create_ssl_context
 from ..stream import AioBinaryStream
 
@@ -112,27 +114,28 @@ class AioConnection(BaseConnection):
         detecting_protocol = False
 
         # choose highest version first
-        if self.client.protocol_version is None:
+        if self.client.protocol_context is None:
             detecting_protocol = True
-            self.client.protocol_version = max(PROTOCOLS)
+            self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported())
 
         try:
             result = await self._connect_version()
         except HandshakeError as e:
             if e.expected_version in PROTOCOLS:
-                self.client.protocol_version = e.expected_version
+                self.client.protocol_context.version = e.expected_version
                 result = await self._connect_version()
             else:
                 raise e
         except connection_errors:
             # restore undefined protocol version
             if detecting_protocol:
-                self.client.protocol_version = None
+                self.client.protocol_context = None
             raise
 
         # connection is ready for end user
+        features = BitmaskFeature.from_array(result.get('features', None))
+        self.client.protocol_context.features = features
         self.uuid = result.get('node_uuid', None)  # version-specific (1.4+)
-
         self.failed = False
         return result
 
@@ -145,10 +148,10 @@ class AioConnection(BaseConnection):
         ssl_context = create_ssl_context(self.ssl_params)
         self._reader, self._writer = await asyncio.open_connection(self.host, self.port, ssl=ssl_context)
 
-        protocol_version = self.client.protocol_version
+        protocol_context = self.client.protocol_context
 
         hs_request = HandshakeRequest(
-            protocol_version,
+            protocol_context,
             self.username,
             self.password
         )
@@ -158,7 +161,7 @@ class AioConnection(BaseConnection):
             await self._send(stream.getbuffer(), reconnect=False)
 
         with AioBinaryStream(self.client, await self._recv(reconnect=False)) as stream:
-            hs_response = await HandshakeResponse.parse_async(stream, self.protocol_version)
+            hs_response = await HandshakeResponse.parse_async(stream, self.protocol_context)
 
             if hs_response.op_code == 0:
                 self._close()
diff --git a/pyignite/connection/bitmask_feature.py b/pyignite/connection/bitmask_feature.py
new file mode 100644
index 0000000..80d51ad
--- /dev/null
+++ b/pyignite/connection/bitmask_feature.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from enum import IntFlag
+from typing import Optional
+
+from pyignite.constants import PROTOCOL_BYTE_ORDER
+
+
+class BitmaskFeature(IntFlag):
+    CLUSTER_API = 1 << 2
+
+    def __bytes__(self) -> bytes:
+        """
+        Convert feature flags array to bytearray bitmask.
+
+        :return: Bitmask as bytearray.
+        """
+        full_bytes = self.bit_length() // 8 + 1
+        return self.to_bytes(full_bytes, byteorder=PROTOCOL_BYTE_ORDER)
+
+    @staticmethod
+    def all_supported() -> 'BitmaskFeature':
+        """
+        Get all supported features.
+
+        :return: All supported features.
+        """
+        supported = BitmaskFeature(0)
+        for feature in BitmaskFeature:
+            supported |= feature
+        return supported
+
+    @staticmethod
+    def from_array(features_array: bytes) -> Optional['BitmaskFeature']:
+        """
+        Get features from bytearray.
+
+        :param features_array: Feature bitmask as array,
+        :return: Return features.
+        """
+        if features_array is None:
+            return None
+        return BitmaskFeature.from_bytes(features_array, byteorder=PROTOCOL_BYTE_ORDER)
diff --git a/pyignite/connection/connection.py b/pyignite/connection/connection.py
index 901cb56..7d5778c 100644
--- a/pyignite/connection/connection.py
+++ b/pyignite/connection/connection.py
@@ -13,29 +13,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
 from collections import OrderedDict
 import socket
 from typing import Union
 
 from pyignite.constants import PROTOCOLS, IGNITE_DEFAULT_HOST, IGNITE_DEFAULT_PORT, PROTOCOL_BYTE_ORDER
 from pyignite.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError
+from .bitmask_feature import BitmaskFeature
 
 from .handshake import HandshakeRequest, HandshakeResponse
+from .protocol_context import ProtocolContext
 from .ssl import wrap, check_ssl_params
 from ..stream import BinaryStream
 
@@ -83,19 +70,18 @@ class BaseConnection:
         return '{}:{}'.format(self.host or '?', self.port or '?')
 
     @property
-    def protocol_version(self):
+    def protocol_context(self):
         """
-        Returns the tuple of major, minor, and revision numbers of the used
-        thin protocol version, or None, if no connection to the Ignite cluster
-        was yet established.
+        Returns protocol context, or None, if no connection to the Ignite
+        cluster was yet established.
         """
-        return self.client.protocol_version
+        return self.client.protocol_context
 
     def _process_handshake_error(self, response):
         error_text = f'Handshake error: {response.message}'
         # if handshake fails for any reason other than protocol mismatch
         # (i.e. authentication error), server version is 0.0.0
-        protocol_version = self.client.protocol_version
+        protocol_version = self.client.protocol_context.version
         server_version = (response.version_major, response.version_minor, response.version_patch)
 
         if any(server_version):
@@ -118,7 +104,7 @@ class Connection(BaseConnection):
      * binary protocol connector. Encapsulates handshake and failover reconnection.
     """
 
-    def __init__(self, client: 'Client', host: str, port: int, timeout: float = 2.0,
+    def __init__(self, client: 'Client', host: str, port: int, timeout: float = None,
                  username: str = None, password: str = None, **ssl_params):
         """
         Initialize connection.
@@ -180,25 +166,27 @@ class Connection(BaseConnection):
         detecting_protocol = False
 
         # choose highest version first
-        if self.client.protocol_version is None:
+        if self.client.protocol_context is None:
             detecting_protocol = True
-            self.client.protocol_version = max(PROTOCOLS)
+            self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported())
 
         try:
             result = self._connect_version()
         except HandshakeError as e:
             if e.expected_version in PROTOCOLS:
-                self.client.protocol_version = e.expected_version
+                self.client.protocol_context.version = e.expected_version
                 result = self._connect_version()
             else:
                 raise e
         except connection_errors:
             # restore undefined protocol version
             if detecting_protocol:
-                self.client.protocol_version = None
+                self.client.protocol_context = None
             raise
 
         # connection is ready for end user
+        features = BitmaskFeature.from_array(result.get('features', None))
+        self.client.protocol_context.features = features
         self.uuid = result.get('node_uuid', None)  # version-specific (1.4+)
         self.failed = False
         return result
@@ -214,10 +202,10 @@ class Connection(BaseConnection):
         self._socket = wrap(self._socket, self.ssl_params)
         self._socket.connect((self.host, self.port))
 
-        protocol_version = self.client.protocol_version
+        protocol_context = self.client.protocol_context
 
         hs_request = HandshakeRequest(
-            protocol_version,
+            protocol_context,
             self.username,
             self.password
         )
@@ -227,7 +215,7 @@ class Connection(BaseConnection):
             self.send(stream.getbuffer(), reconnect=False)
 
         with BinaryStream(self.client, self.recv(reconnect=False)) as stream:
-            hs_response = HandshakeResponse.parse(stream, self.protocol_version)
+            hs_response = HandshakeResponse.parse(stream, self.protocol_context)
 
             if hs_response.op_code == 0:
                 self.close()
diff --git a/pyignite/connection/handshake.py b/pyignite/connection/handshake.py
index 0b0fe50..af7bdb3 100644
--- a/pyignite/connection/handshake.py
+++ b/pyignite/connection/handshake.py
@@ -13,9 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import Optional, Tuple
+from typing import Optional
 
-from pyignite.datatypes import Byte, Int, Short, String, UUIDObject
+from pyignite.connection.protocol_context import ProtocolContext
+from pyignite.datatypes import Byte, Int, Short, String, UUIDObject, ByteArrayObject
 from pyignite.datatypes.internal import Struct
 from pyignite.stream import READ_BACKWARD
 
@@ -27,10 +28,10 @@ class HandshakeRequest:
     handshake_struct = None
     username = None
     password = None
-    protocol_version = None
+    protocol_context = None
 
     def __init__(
-        self, protocol_version: Tuple[int, int, int],
+        self, protocol_context: 'ProtocolContext',
         username: Optional[str] = None, password: Optional[str] = None
     ):
         fields = [
@@ -41,7 +42,9 @@ class HandshakeRequest:
             ('version_patch', Short),
             ('client_code', Byte),
         ]
-        self.protocol_version = protocol_version
+        self.protocol_context = protocol_context
+        if self.protocol_context.is_feature_flags_supported():
+            fields.append(('features', ByteArrayObject))
         if username and password:
             self.username = username
             self.password = password
@@ -58,14 +61,19 @@ class HandshakeRequest:
         await self.handshake_struct.from_python_async(stream, self.__create_handshake_data())
 
     def __create_handshake_data(self):
+        version = self.protocol_context.version
         handshake_data = {
             'length': 8,
             'op_code': OP_HANDSHAKE,
-            'version_major': self.protocol_version[0],
-            'version_minor': self.protocol_version[1],
-            'version_patch': self.protocol_version[2],
+            'version_major': version[0],
+            'version_minor': version[1],
+            'version_patch': version[2],
             'client_code': 2,  # fixed value defined by protocol
         }
+        if self.protocol_context.is_feature_flags_supported():
+            features = bytes(self.protocol_context.features)
+            handshake_data['features'] = features
+            handshake_data['length'] += 5 + len(features)
         if self.username and self.password:
             handshake_data.update({
                 'username': self.username,
@@ -96,12 +104,12 @@ class HandshakeResponse(dict):
         return self.get(item)
 
     @classmethod
-    def parse(cls, stream, protocol_version):
+    def parse(cls, stream, protocol_context):
         start_class = cls.__response_start.parse(stream)
         start = stream.read_ctype(start_class, direction=READ_BACKWARD)
         data = cls.__response_start.to_python(start)
 
-        response_end = cls.__create_response_end(data, protocol_version)
+        response_end = cls.__create_response_end(data, protocol_context)
         if response_end:
             end_class = response_end.parse(stream)
             end = stream.read_ctype(end_class, direction=READ_BACKWARD)
@@ -110,12 +118,12 @@ class HandshakeResponse(dict):
         return cls(data)
 
     @classmethod
-    async def parse_async(cls, stream, protocol_version):
+    async def parse_async(cls, stream, protocol_context):
         start_class = cls.__response_start.parse(stream)
         start = stream.read_ctype(start_class, direction=READ_BACKWARD)
         data = await cls.__response_start.to_python_async(start)
 
-        response_end = cls.__create_response_end(data, protocol_version)
+        response_end = cls.__create_response_end(data, protocol_context)
         if response_end:
             end_class = await response_end.parse_async(stream)
             end = stream.read_ctype(end_class, direction=READ_BACKWARD)
@@ -124,7 +132,7 @@ class HandshakeResponse(dict):
         return cls(data)
 
     @classmethod
-    def __create_response_end(cls, start_data, protocol_version):
+    def __create_response_end(cls, start_data, protocol_context):
         response_end = None
         if start_data['op_code'] == 0:
             response_end = Struct([
@@ -134,7 +142,12 @@ class HandshakeResponse(dict):
                 ('message', String),
                 ('client_status', Int)
             ])
-        elif protocol_version >= (1, 4, 0):
+        elif protocol_context.is_feature_flags_supported():
+            response_end = Struct([
+                ('features', ByteArrayObject),
+                ('node_uuid', UUIDObject),
+            ])
+        elif protocol_context.is_partition_awareness_supported():
             response_end = Struct([
                 ('node_uuid', UUIDObject),
             ])
diff --git a/pyignite/connection/protocol_context.py b/pyignite/connection/protocol_context.py
new file mode 100644
index 0000000..54f5240
--- /dev/null
+++ b/pyignite/connection/protocol_context.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Tuple
+
+from pyignite.connection.bitmask_feature import BitmaskFeature
+
+
+class ProtocolContext:
+    """
+    Protocol context. Provides ability to easily check supported supported
+    protocol features.
+    """
+
+    def __init__(self, version: Tuple[int, int, int], features: BitmaskFeature = None):
+        self._version = version
+        self._features = features
+        self._ensure_consistency()
+
+    def __hash__(self):
+        return hash((self._version, self._features))
+
+    def __eq__(self, other):
+        return isinstance(other, ProtocolContext) and \
+            self.version == other.version and \
+            self.features == other.features
+
+    def _ensure_consistency(self):
+        if not self.is_feature_flags_supported():
+            self._features = None
+
+    @property
+    def version(self):
+        return getattr(self, '_version', None)
+
+    @version.setter
+    def version(self, version: Tuple[int, int, int]):
+        """
+        Set version.
+
+        This call may result in features being reset to None if the protocol
+        version does not support feature masks.
+
+        :param version: Version to set.
+        """
+        setattr(self, '_version', version)
+        self._ensure_consistency()
+
+    @property
+    def features(self):
+        return getattr(self, '_features', None)
+
+    @features.setter
+    def features(self, features: BitmaskFeature):
+        """
+        Try and set new feature set.
+
+        If features are not supported by the protocol, None is set as features
+        instead.
+
+        :param features: Features to set.
+        """
+        setattr(self, '_features', features)
+        self._ensure_consistency()
+
+    def is_partition_awareness_supported(self) -> bool:
+        """
+        Check whether partition awareness supported by the current protocol.
+        """
+        return self.version >= (1, 4, 0)
+
+    def is_status_flags_supported(self) -> bool:
+        """
+        Check whether status flags supported by the current protocol.
+        """
+        return self.version >= (1, 4, 0)
+
+    def is_feature_flags_supported(self) -> bool:
+        """
+        Check whether feature flags supported by the current protocol.
+        """
+        return self.version >= (1, 7, 0)
+
+    def is_cluster_api_supported(self) -> bool:
+        """
+        Check whether cluster API supported by the current protocol.
+        """
+        return self.features and BitmaskFeature.CLUSTER_API in self.features
diff --git a/pyignite/constants.py b/pyignite/constants.py
index 02f7124..c08a3ce 100644
--- a/pyignite/constants.py
+++ b/pyignite/constants.py
@@ -31,14 +31,17 @@ __all__ = [
 ]
 
 PROTOCOLS = {
+    (1, 7, 0),
+    (1, 6, 0),
+    (1, 5, 0),
     (1, 4, 0),
     (1, 3, 0),
     (1, 2, 0),
 }
 
 PROTOCOL_VERSION_MAJOR = 1
-PROTOCOL_VERSION_MINOR = 4
-PROTOCOL_VERSION_PATCH = 0
+PROTOCOL_VERSION_MINOR = 7
+PROTOCOL_VERSION_PATCH = 1
 
 MAX_LONG = 9223372036854775807
 MIN_LONG = -9223372036854775808
diff --git a/pyignite/datatypes/cluster_state.py b/pyignite/datatypes/cluster_state.py
new file mode 100644
index 0000000..863a1d2
--- /dev/null
+++ b/pyignite/datatypes/cluster_state.py
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from enum import IntEnum
+
+
+class ClusterState(IntEnum):
+    #: Cluster deactivated. Cache operations aren't allowed.
+    INACTIVE = 0
+
+    #: Cluster activated. All cache operations are allowed.
+    ACTIVE = 1
+
+    #: Cluster activated. Cache read operation allowed, Cache data change operation
+    #: aren't allowed.
+    ACTIVE_READ_ONLY = 2
diff --git a/pyignite/exceptions.py b/pyignite/exceptions.py
index 579aa29..215ccd0 100644
--- a/pyignite/exceptions.py
+++ b/pyignite/exceptions.py
@@ -65,7 +65,7 @@ class ParameterError(Exception):
 
 class CacheError(Exception):
     """
-    This exception is raised, whenever any remote Thin client operation
+    This exception is raised, whenever any remote Thin client cache operation
     returns an error.
     """
     pass
@@ -93,4 +93,20 @@ class SQLError(CacheError):
     pass
 
 
+class ClusterError(Exception):
+    """
+    This exception is raised, whenever any remote Thin client cluster operation
+    returns an error.
+    """
+    pass
+
+
+class NotSupportedByClusterError(Exception):
+    """
+    This exception is raised, whenever cluster is not supported specific
+    operation probably because it is outdated.
+    """
+    pass
+
+
 connection_errors = (IOError, OSError, EOFError)
diff --git a/pyignite/queries/op_codes.py b/pyignite/queries/op_codes.py
index 7372713..c152f7c 100644
--- a/pyignite/queries/op_codes.py
+++ b/pyignite/queries/op_codes.py
@@ -61,7 +61,10 @@ OP_QUERY_SQL_CURSOR_GET_PAGE = 2003
 OP_QUERY_SQL_FIELDS = 2004
 OP_QUERY_SQL_FIELDS_CURSOR_GET_PAGE = 2005
 
-P_GET_BINARY_TYPE_NAME = 3000
+OP_GET_BINARY_TYPE_NAME = 3000
 OP_REGISTER_BINARY_TYPE_NAME = 3001
 OP_GET_BINARY_TYPE = 3002
 OP_PUT_BINARY_TYPE = 3003
+
+OP_CLUSTER_GET_STATE = 5000
+OP_CLUSTER_CHANGE_STATE = 5001
diff --git a/pyignite/queries/query.py b/pyignite/queries/query.py
index beea5d9..d9e6aaf 100644
--- a/pyignite/queries/query.py
+++ b/pyignite/queries/query.py
@@ -124,7 +124,7 @@ class Query:
             self.from_python(stream, query_params)
             response_data = conn.request(stream.getbuffer())
 
-        response_struct = self.response_type(protocol_version=conn.protocol_version,
+        response_struct = self.response_type(protocol_context=conn.protocol_context,
                                              following=response_config, **kwargs)
 
         with BinaryStream(conn.client, response_data) as stream:
@@ -156,7 +156,7 @@ class Query:
             await self.from_python_async(stream, query_params)
             data = await conn.request(stream.getbuffer())
 
-        response_struct = self.response_type(protocol_version=conn.protocol_version,
+        response_struct = self.response_type(protocol_context=conn.protocol_context,
                                              following=response_config, **kwargs)
 
         with AioBinaryStream(conn.client, data) as stream:
diff --git a/pyignite/queries/response.py b/pyignite/queries/response.py
index 83a6e6a..6495802 100644
--- a/pyignite/queries/response.py
+++ b/pyignite/queries/response.py
@@ -19,6 +19,7 @@ import attr
 from collections import OrderedDict
 import ctypes
 
+from pyignite.connection.protocol_context import ProtocolContext
 from pyignite.constants import RHF_TOPOLOGY_CHANGED, RHF_ERROR
 from pyignite.datatypes import AnyDataObject, Bool, Int, Long, String, StringArray, Struct
 from pyignite.datatypes.binary import body_struct, enum_struct, schema_struct
@@ -29,7 +30,7 @@ from pyignite.stream import READ_BACKWARD
 @attr.s
 class Response:
     following = attr.ib(type=list, factory=list)
-    protocol_version = attr.ib(type=tuple, factory=tuple)
+    protocol_context = attr.ib(type=type(ProtocolContext), default=None)
     _response_header = None
     _response_class_name = 'Response'
 
@@ -44,7 +45,7 @@ class Response:
                 ('query_id', ctypes.c_longlong),
             ]
 
-            if self.protocol_version and self.protocol_version >= (1, 4, 0):
+            if self.protocol_context.is_status_flags_supported():
                 fields.append(('flags', ctypes.c_short))
             else:
                 fields.append(('status_code', ctypes.c_int),)
@@ -68,7 +69,7 @@ class Response:
 
         fields = []
         has_error = False
-        if self.protocol_version and self.protocol_version >= (1, 4, 0):
+        if self.protocol_context.is_status_flags_supported():
             if header.flags & RHF_TOPOLOGY_CHANGED:
                 fields = [
                     ('affinity_version', ctypes.c_longlong),
diff --git a/pyignite/stream/aio_cluster.py b/pyignite/stream/aio_cluster.py
new file mode 100644
index 0000000..8a2f98e
--- /dev/null
+++ b/pyignite/stream/aio_cluster.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains `AioCluster` that lets you get info and change state of the
+whole cluster.
+"""
+from pyignite import AioClient
+from pyignite.api.cluster import cluster_get_state_async, cluster_set_state_async
+
+
+class AioCluster:
+    """
+    Ignite cluster abstraction. Users should never use this class directly,
+    but construct its instances with
+    :py:meth:`~pyignite.aio_client.AioClient.get_cluster` method instead.
+    """
+
+    def __init__(self, client: 'AioClient'):
+        self._client = client
+
+    async def get_state(self):
+        """
+        Gets current cluster state.
+
+        :return: Current cluster state. This is one of ClusterState.INACTIVE,
+         ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
+        """
+        return await cluster_get_state_async(await self._client.random_node())
+
+    async def set_state(self, state):
+        """
+        Changes current cluster state to the given.
+
+        Note: Deactivation clears in-memory caches (without persistence)
+         including the system caches.
+
+        :param state: New cluster state. This is one of ClusterState.INACTIVE,
+         ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
+        """
+        return await cluster_set_state_async(await self._client.random_node(), state)
diff --git a/tests/config/ignite-config.xml.jinja2 b/tests/config/ignite-config.xml.jinja2
index 2bf5129..325a581 100644
--- a/tests/config/ignite-config.xml.jinja2
+++ b/tests/config/ignite-config.xml.jinja2
@@ -31,7 +31,7 @@
             <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
                 <property name="defaultDataRegionConfiguration">
                     <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
-                        {% if use_auth %}
+                        {% if use_persistence %}
                         <property name="persistenceEnabled" value="true"/>
                         {% endif %}
                     </bean>
@@ -51,9 +51,8 @@
         <property name="authenticationEnabled" value="true"/>
         {% endif %}
 
-
         {% if use_ssl %}
-            <property name="connectorConfiguration"><null/></property>
+        <property name="connectorConfiguration"><null/></property>
         {% endif %}
 
         <property name="clientConnectorConfiguration">
diff --git a/tests/custom/test_cluster.py b/tests/custom/test_cluster.py
new file mode 100644
index 0000000..e82e238
--- /dev/null
+++ b/tests/custom/test_cluster.py
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from pyignite import Client, AioClient
+from pyignite.exceptions import CacheError
+from tests.util import clear_ignite_work_dir, start_ignite_gen
+
+from pyignite.datatypes.cluster_state import ClusterState
+
+
+@pytest.fixture(params=['with-persistence', 'without-persistence'])
+def with_persistence(request):
+    yield request.param == 'with-persistence'
+
+
+@pytest.fixture(autouse=True)
+def cleanup():
+    clear_ignite_work_dir()
+    yield None
+    clear_ignite_work_dir()
+
+
+@pytest.fixture(autouse=True)
+def server1(with_persistence, cleanup):
+    yield from start_ignite_gen(idx=1, use_persistence=with_persistence)
+
+
+@pytest.fixture(autouse=True)
+def server2(with_persistence, cleanup):
+    yield from start_ignite_gen(idx=2, use_persistence=with_persistence)
+
+
+def test_cluster_set_active(with_persistence):
+    key = 42
+    val = 42
+    start_state = ClusterState.INACTIVE if with_persistence else ClusterState.ACTIVE
+
+    client = Client()
+    with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]):
+        cluster = client.get_cluster()
+        assert cluster.get_state() == start_state
+
+        cluster.set_state(ClusterState.ACTIVE)
+        assert cluster.get_state() == ClusterState.ACTIVE
+
+        cache = client.get_or_create_cache("test_cache")
+        cache.put(key, val)
+        assert cache.get(key) == val
+
+        cluster.set_state(ClusterState.ACTIVE_READ_ONLY)
+        assert cluster.get_state() == ClusterState.ACTIVE_READ_ONLY
+
+        assert cache.get(key) == val
+        with pytest.raises(CacheError):
+            cache.put(key, val + 1)
+
+        cluster.set_state(ClusterState.INACTIVE)
+        assert cluster.get_state() == ClusterState.INACTIVE
+
+        with pytest.raises(CacheError):
+            cache.get(key)
+
+        with pytest.raises(CacheError):
+            cache.put(key, val + 1)
+
+        cluster.set_state(ClusterState.ACTIVE)
+        assert cluster.get_state() == ClusterState.ACTIVE
+
+        cache.put(key, val + 2)
+        assert cache.get(key) == val + 2
+
+
+@pytest.mark.asyncio
+async def test_cluster_set_active_async(with_persistence):
+    key = 42
+    val = 42
+    start_state = ClusterState.INACTIVE if with_persistence else ClusterState.ACTIVE
+
+    client = AioClient()
+    async with client.connect([("127.0.0.1", 10801), ("127.0.0.1", 10802)]):
+        cluster = client.get_cluster()
+        assert await cluster.get_state() == start_state
+
+        await cluster.set_state(ClusterState.ACTIVE)
+        assert await cluster.get_state() == ClusterState.ACTIVE
+
+        cache = await client.get_or_create_cache("test_cache")
+        await cache.put(key, val)
+        assert await cache.get(key) == val
+
+        await cluster.set_state(ClusterState.ACTIVE_READ_ONLY)
+        assert await cluster.get_state() == ClusterState.ACTIVE_READ_ONLY
+
+        assert await cache.get(key) == val
+        with pytest.raises(CacheError):
+            await cache.put(key, val + 1)
+
+        await cluster.set_state(ClusterState.INACTIVE)
+        assert await cluster.get_state() == ClusterState.INACTIVE
+
+        with pytest.raises(CacheError):
+            await cache.get(key)
+
+        with pytest.raises(CacheError):
+            await cache.put(key, val + 1)
+
+        await cluster.set_state(ClusterState.ACTIVE)
+        assert await cluster.get_state() == ClusterState.ACTIVE
+
+        await cache.put(key, val + 2)
+        assert await cache.get(key) == val + 2
diff --git a/tests/util.py b/tests/util.py
index 5651739..af3b70e 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -155,7 +155,7 @@ def create_config_file(tpl_name, file_name, **kwargs):
         f.write(template.render(**kwargs))
 
 
-def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False):
+def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False, use_persistence=False):
     clear_logs(idx)
 
     runner = get_ignite_runner()
@@ -166,8 +166,16 @@ def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False):
         env["JVM_OPTS"] = "-Djava.net.preferIPv4Stack=true -Xdebug -Xnoagent -Djava.compiler=NONE " \
                           "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 "
 
-    params = {'ignite_instance_idx': str(idx), 'ignite_client_port': 10800 + idx, 'use_ssl': use_ssl,
-              'use_auth': use_auth}
+    if use_auth:
+        use_persistence = True
+
+    params = {
+        'ignite_instance_idx': str(idx),
+        'ignite_client_port': 10800 + idx,
+        'use_ssl': use_ssl,
+        'use_auth': use_auth,
+        'use_persistence': use_persistence,
+    }
 
     create_config_file('log4j.xml.jinja2', f'log4j-{idx}.xml', **params)
     create_config_file('ignite-config.xml.jinja2', f'ignite-config-{idx}.xml', **params)
@@ -177,7 +185,7 @@ def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False):
 
     srv = subprocess.Popen(ignite_cmd, env=env, cwd=get_test_dir())
 
-    started = wait_for_condition(lambda: check_server_started(idx), timeout=30)
+    started = wait_for_condition(lambda: check_server_started(idx), timeout=60)
     if started:
         return srv
 
@@ -185,8 +193,8 @@ def start_ignite(idx=1, debug=False, use_ssl=False, use_auth=False):
     raise Exception("Failed to start Ignite: timeout while trying to connect")
 
 
-def start_ignite_gen(idx=1, use_ssl=False, use_auth=False):
-    srv = start_ignite(idx, use_ssl=use_ssl, use_auth=use_auth)
+def start_ignite_gen(idx=1, use_ssl=False, use_auth=False, use_persistence=False):
+    srv = start_ignite(idx, use_ssl=use_ssl, use_auth=use_auth, use_persistence=use_persistence)
     try:
         yield srv
     finally: