You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2021/02/19 09:36:58 UTC

[ignite-python-thin-client] branch master updated: IGNITE-14211 Remove existing cache requirement from SQL API

This is an automated email from the ASF dual-hosted git repository.

isapego pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-python-thin-client.git


The following commit(s) were added to refs/heads/master by this push:
     new 672a767  IGNITE-14211 Remove existing cache requirement from SQL API
672a767 is described below

commit 672a767bf37f43eac4d5415d42a7c62527a1916f
Author: Igor Sapego <ig...@gmail.com>
AuthorDate: Fri Feb 19 12:36:16 2021 +0300

    IGNITE-14211 Remove existing cache requirement from SQL API
    
    This closes #18
---
 pyignite/api/sql.py       | 23 +++++--------
 pyignite/client.py        | 18 ++++++----
 pyignite/utils.py         |  3 ++
 tests/test_binary.py      |  5 ---
 tests/test_cache_class.py |  4 +--
 tests/test_sql.py         | 84 ++++++++++++++++++++++++++++++++++++++++-------
 6 files changed, 98 insertions(+), 39 deletions(-)

diff --git a/pyignite/api/sql.py b/pyignite/api/sql.py
index 73cacc6..dc470d1 100644
--- a/pyignite/api/sql.py
+++ b/pyignite/api/sql.py
@@ -283,36 +283,31 @@ def sql_fields(
     Performs SQL fields query.
 
     :param conn: connection to Ignite server,
-    :param cache: name or ID of the cache,
+    :param cache: name or ID of the cache. If zero, then schema is used.
     :param query_str: SQL query string,
     :param page_size: cursor page size,
     :param query_args: (optional) query arguments. List of values or
      (value, type hint) tuples,
-    :param schema: (optional) schema for the query. Defaults to `PUBLIC`,
+    :param schema: schema for the query.
     :param statement_type: (optional) statement type. Can be:
 
      * StatementType.ALL − any type (default),
      * StatementType.SELECT − select,
      * StatementType.UPDATE − update.
 
-    :param distributed_joins: (optional) distributed joins. Defaults to False,
+    :param distributed_joins: (optional) distributed joins.
     :param local: (optional) pass True if this query should be executed
-     on local node only. Defaults to False,
+     on local node only.
     :param replicated_only: (optional) whether query contains only
-     replicated tables or not. Defaults to False,
-    :param enforce_join_order: (optional) enforce join order. Defaults
-     to False,
+     replicated tables or not.
+    :param enforce_join_order: (optional) enforce join order.
     :param collocated: (optional) whether your data is co-located or not.
-     Defaults to False,
-    :param lazy: (optional) lazy query execution. Defaults to False,
+    :param lazy: (optional) lazy query execution.
     :param include_field_names: (optional) include field names in result.
-     Defaults to False,
-    :param max_rows: (optional) query-wide maximum of rows. Defaults to -1
-     (all rows),
+    :param max_rows: (optional) query-wide maximum of rows.
     :param timeout: (optional) non-negative timeout value in ms. Zero disables
-     timeout (default),
+     timeout.
     :param binary: (optional) pass True to keep the value in binary form.
-     False by default,
     :param query_id: (optional) a value generated by client and returned as-is
      in response.query_id. When the parameter is omitted, a random value
      is generated,
diff --git a/pyignite/client.py b/pyignite/client.py
index 77c6373..9416474 100644
--- a/pyignite/client.py
+++ b/pyignite/client.py
@@ -58,7 +58,7 @@ from .exceptions import (
     BinaryTypeError, CacheError, ReconnectError, SQLError, connection_errors,
 )
 from .utils import (
-    capitalize, entity_id, schema_id, process_delimiter,
+    cache_id, capitalize, entity_id, schema_id, process_delimiter,
     status_to_exception, is_iterable,
 )
 from .binary import GenericObjectMeta
@@ -513,13 +513,14 @@ class Client:
         return cache_get_names(self.random_node)
 
     def sql(
-        self, query_str: str, page_size: int = 1024, query_args: Iterable = None,
-        schema: Union[int, str] = 'PUBLIC',
+        self, query_str: str, page_size: int = 1024,
+        query_args: Iterable = None, schema: str = 'PUBLIC',
         statement_type: int = 0, distributed_joins: bool = False,
         local: bool = False, replicated_only: bool = False,
         enforce_join_order: bool = False, collocated: bool = False,
         lazy: bool = False, include_field_names: bool = False,
         max_rows: int = -1, timeout: int = 0,
+        cache: Union[int, str, Cache] = None
     ):
         """
         Runs an SQL query and returns its result.
@@ -553,6 +554,8 @@ class Client:
          (all rows),
         :param timeout: (optional) non-negative timeout value in ms.
          Zero disables timeout (default),
+        :param cache (optional) Name or ID of the cache to use to infer schema.
+         If set, 'schema' argument is ignored,
         :return: generator with result rows as a lists. If
          `include_field_names` was set, the first row will hold field names.
         """
@@ -580,10 +583,13 @@ class Client:
 
         conn = self.random_node
 
-        schema = self.get_cache(schema)
+        c_id = cache.cache_id if isinstance(cache, Cache) else cache_id(cache)
+
+        if c_id != 0:
+            schema = None
+
         result = sql_fields(
-            conn, schema.cache_id, query_str,
-            page_size, query_args, schema.name,
+            conn, c_id, query_str, page_size, query_args, schema,
             statement_type, distributed_joins, local, replicated_only,
             enforce_join_order, collocated, lazy, include_field_names,
             max_rows, timeout,
diff --git a/pyignite/utils.py b/pyignite/utils.py
index 67f164f..f1a7f90 100644
--- a/pyignite/utils.py
+++ b/pyignite/utils.py
@@ -105,6 +105,9 @@ def hashcode(data: Union[str, bytes, bytearray, memoryview]) -> int:
 
 
 def __hashcode_fallback(data: Union[str, bytes, bytearray, memoryview]) -> int:
+    if data is None:
+        return 0
+    
     if isinstance(data, str):
         """
         For strings we iterate over code point which are of the int type
diff --git a/tests/test_binary.py b/tests/test_binary.py
index 45d1d25..5fa2ec4 100644
--- a/tests/test_binary.py
+++ b/tests/test_binary.py
@@ -63,8 +63,6 @@ drop_query = 'DROP TABLE {} IF EXISTS'.format(table_sql_name)
 
 
 def test_sql_read_as_binary(client):
-
-    client.get_or_create_cache(scheme_name)
     client.sql(drop_query)
 
     # create table
@@ -92,9 +90,6 @@ def test_sql_read_as_binary(client):
 
 
 def test_sql_write_as_binary(client):
-
-    client.get_or_create_cache(scheme_name)
-
     # configure cache as an SQL table
     type_name = table_cache_name
 
diff --git a/tests/test_cache_class.py b/tests/test_cache_class.py
index 1df0d44..940160a 100644
--- a/tests/test_cache_class.py
+++ b/tests/test_cache_class.py
@@ -62,9 +62,7 @@ def test_cache_remove(client):
 
 
 def test_cache_get(client):
-    client.get_or_create_cache('my_cache')
-
-    my_cache = client.get_cache('my_cache')
+    my_cache = client.get_or_create_cache('my_cache')
     assert my_cache.settings[PROP_NAME] == 'my_cache'
     my_cache.destroy()
 
diff --git a/tests/test_sql.py b/tests/test_sql.py
index c896afb..f25fedd 100644
--- a/tests/test_sql.py
+++ b/tests/test_sql.py
@@ -20,12 +20,12 @@ from pyignite.api import (
     sql, sql_cursor_get_page,
     cache_get_configuration,
 )
+from pyignite.datatypes.cache_config import CacheMode
 from pyignite.datatypes.prop_codes import *
 from pyignite.exceptions import SQLError
 from pyignite.utils import entity_id
 from pyignite.binary import unwrap_binary
 
-
 initial_data = [
         ('John', 'Doe', 5),
         ('Jane', 'Roe', 4),
@@ -59,9 +59,10 @@ def test_sql(client):
 
     result = sql_fields(
         conn,
-        'PUBLIC',
+        0,
         create_query,
         page_size,
+        schema='PUBLIC',
         include_field_names=True
     )
     assert result.status == 0, result.message
@@ -70,9 +71,10 @@ def test_sql(client):
         fname, lname, grade = data_line
         result = sql_fields(
             conn,
-            'PUBLIC',
+            0,
             insert_query,
             page_size,
+            schema='PUBLIC',
             query_args=[i, fname, lname, grade],
             include_field_names=True
         )
@@ -108,7 +110,7 @@ def test_sql(client):
             assert data.type_id == entity_id(binary_type_name)
 
     # repeat cleanup
-    result = sql_fields(conn, 'PUBLIC', drop_query, page_size)
+    result = sql_fields(conn, 0, drop_query, page_size, schema='PUBLIC')
     assert result.status == 0
 
 
@@ -121,9 +123,10 @@ def test_sql_fields(client):
 
     result = sql_fields(
         conn,
-        'PUBLIC',
+        0,
         create_query,
         page_size,
+        schema='PUBLIC',
         include_field_names=True
     )
     assert result.status == 0, result.message
@@ -132,9 +135,10 @@ def test_sql_fields(client):
         fname, lname, grade = data_line
         result = sql_fields(
             conn,
-            'PUBLIC',
+            0,
             insert_query,
             page_size,
+            schema='PUBLIC',
             query_args=[i, fname, lname, grade],
             include_field_names=True
         )
@@ -142,9 +146,10 @@ def test_sql_fields(client):
 
     result = sql_fields(
         conn,
-        'PUBLIC',
+        0,
         select_query,
         page_size,
+        schema='PUBLIC',
         include_field_names=True
     )
     assert result.status == 0
@@ -159,7 +164,7 @@ def test_sql_fields(client):
     assert result.value['more'] is False
 
     # repeat cleanup
-    result = sql_fields(conn, 'PUBLIC', drop_query, page_size)
+    result = sql_fields(conn, 0, drop_query, page_size, schema='PUBLIC')
     assert result.status == 0
 
 
@@ -176,7 +181,7 @@ def test_long_multipage_query(client):
 
     client.sql('DROP TABLE LongMultipageQuery IF EXISTS')
 
-    client.sql("CREATE TABLE LongMultiPageQuery (%s, %s)" % \
+    client.sql("CREATE TABLE LongMultiPageQuery (%s, %s)" %
                (fields[0] + " INT(11) PRIMARY KEY", ",".join(map(lambda f: f + " INT(11)", fields[1:]))))
 
     for id in range(1, 21):
@@ -193,6 +198,63 @@ def test_long_multipage_query(client):
     client.sql(drop_query)
 
 
-def test_sql_not_create_cache(client):
+def test_sql_not_create_cache_with_schema(client):
     with pytest.raises(SQLError, match=r".*Cache does not exist.*"):
-        client.sql(schema='IS_NOT_EXISTING', query_str='select * from IsNotExisting')
+        client.sql(schema=None, cache='NOT_EXISTING', query_str='select * from NotExisting')
+
+
+def test_sql_not_create_cache_with_cache(client):
+    with pytest.raises(SQLError, match=r".*Failed to set schema.*"):
+        client.sql(schema='NOT_EXISTING', query_str='select * from NotExisting')
+
+
+def test_query_with_cache(client):
+    test_key = 42
+    test_value = 'Lorem ipsum'
+
+    cache_name = test_query_with_cache.__name__.upper()
+    schema_name = f'{cache_name}_schema'.upper()
+    table_name = f'{cache_name}_table'.upper()
+
+    cache = client.create_cache({
+        PROP_NAME: cache_name,
+        PROP_SQL_SCHEMA: schema_name,
+        PROP_CACHE_MODE: CacheMode.PARTITIONED,
+        PROP_QUERY_ENTITIES: [
+            {
+                'table_name': table_name,
+                'key_field_name': 'KEY',
+                'value_field_name': 'VALUE',
+                'key_type_name': 'java.lang.Long',
+                'value_type_name': 'java.lang.String',
+                'query_indexes': [],
+                'field_name_aliases': [],
+                'query_fields': [
+                    {
+                        'name': 'KEY',
+                        'type_name': 'java.lang.Long',
+                        'is_key_field': True,
+                        'is_notnull_constraint_field': True,
+                    },
+                    {
+                        'name': 'VALUE',
+                        'type_name': 'java.lang.String',
+                    },
+                ],
+            },
+        ],
+    })
+
+    cache.put(test_key, test_value)
+
+    args_to_check = [
+        ('schema', schema_name),
+        ('cache', cache),
+        ('cache', cache.name),
+        ('cache', cache.cache_id)
+    ]
+
+    for param, value in args_to_check:
+        page = client.sql(f'select value from {table_name}', **{param: value})
+        received = next(page)[0]
+        assert test_value == received