You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by vi...@apache.org on 2022/04/08 17:32:48 UTC
[superset] branch master updated: test(jinja): refactor to functional tests (#19606)
This is an automated email from the ASF dual-hosted git repository.
villebro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/master by this push:
new 9a9e3b6e3b test(jinja): refactor to functional tests (#19606)
9a9e3b6e3b is described below
commit 9a9e3b6e3bafd9a0d58b2f49404c19b31d2bc48a
Author: Ville Brofeldt <33...@users.noreply.github.com>
AuthorDate: Fri Apr 8 20:32:38 2022 +0300
test(jinja): refactor to functional tests (#19606)
---
tests/integration_tests/jinja_context_tests.py | 422 -------------------------
tests/integration_tests/test_jinja_context.py | 190 +++++++++++
tests/unit_tests/test_jinja_context.py | 268 ++++++++++++++++
3 files changed, 458 insertions(+), 422 deletions(-)
diff --git a/tests/integration_tests/jinja_context_tests.py b/tests/integration_tests/jinja_context_tests.py
deleted file mode 100644
index 924e93e17e..0000000000
--- a/tests/integration_tests/jinja_context_tests.py
+++ /dev/null
@@ -1,422 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-import json
-from datetime import datetime
-from typing import Any
-from unittest import mock
-
-import pytest
-from sqlalchemy.dialects.postgresql import dialect
-
-import superset.utils.database
-import tests.integration_tests.test_app
-from superset import app
-from superset.exceptions import SupersetTemplateException
-from superset.jinja_context import ExtraCache, get_template_processor, safe_proxy
-from superset.utils import core as utils
-from tests.integration_tests.base_tests import SupersetTestCase
-
-
-class TestJinja2Context(SupersetTestCase):
- def test_filter_values_default(self) -> None:
- with app.test_request_context():
- cache = ExtraCache()
- self.assertEqual(cache.filter_values("name", "foo"), ["foo"])
- self.assertEqual(cache.removed_filters, list())
-
- def test_filter_values_remove_not_present(self) -> None:
- with app.test_request_context():
- cache = ExtraCache()
- self.assertEqual(cache.filter_values("name", remove_filter=True), [])
- self.assertEqual(cache.removed_filters, list())
-
- def test_get_filters_remove_not_present(self) -> None:
- with app.test_request_context():
- cache = ExtraCache()
- self.assertEqual(cache.get_filters("name", remove_filter=True), [])
- self.assertEqual(cache.removed_filters, list())
-
- def test_filter_values_no_default(self) -> None:
- with app.test_request_context():
- cache = ExtraCache()
- self.assertEqual(cache.filter_values("name"), [])
-
- def test_filter_values_adhoc_filters(self) -> None:
- with app.test_request_context(
- data={
- "form_data": json.dumps(
- {
- "adhoc_filters": [
- {
- "clause": "WHERE",
- "comparator": "foo",
- "expressionType": "SIMPLE",
- "operator": "in",
- "subject": "name",
- }
- ],
- }
- )
- }
- ):
- cache = ExtraCache()
- self.assertEqual(cache.filter_values("name"), ["foo"])
- self.assertEqual(cache.applied_filters, ["name"])
-
- with app.test_request_context(
- data={
- "form_data": json.dumps(
- {
- "adhoc_filters": [
- {
- "clause": "WHERE",
- "comparator": ["foo", "bar"],
- "expressionType": "SIMPLE",
- "operator": "in",
- "subject": "name",
- }
- ],
- }
- )
- }
- ):
- cache = ExtraCache()
- self.assertEqual(cache.filter_values("name"), ["foo", "bar"])
- self.assertEqual(cache.applied_filters, ["name"])
-
- def test_get_filters_adhoc_filters(self) -> None:
- with app.test_request_context(
- data={
- "form_data": json.dumps(
- {
- "adhoc_filters": [
- {
- "clause": "WHERE",
- "comparator": "foo",
- "expressionType": "SIMPLE",
- "operator": "in",
- "subject": "name",
- }
- ],
- }
- )
- }
- ):
- cache = ExtraCache()
- self.assertEqual(
- cache.get_filters("name"), [{"op": "IN", "col": "name", "val": ["foo"]}]
- )
- self.assertEqual(cache.removed_filters, list())
- self.assertEqual(cache.applied_filters, ["name"])
-
- with app.test_request_context(
- data={
- "form_data": json.dumps(
- {
- "adhoc_filters": [
- {
- "clause": "WHERE",
- "comparator": ["foo", "bar"],
- "expressionType": "SIMPLE",
- "operator": "in",
- "subject": "name",
- }
- ],
- }
- )
- }
- ):
- cache = ExtraCache()
- self.assertEqual(
- cache.get_filters("name"),
- [{"op": "IN", "col": "name", "val": ["foo", "bar"]}],
- )
- self.assertEqual(cache.removed_filters, list())
-
- with app.test_request_context(
- data={
- "form_data": json.dumps(
- {
- "adhoc_filters": [
- {
- "clause": "WHERE",
- "comparator": ["foo", "bar"],
- "expressionType": "SIMPLE",
- "operator": "in",
- "subject": "name",
- }
- ],
- }
- )
- }
- ):
- cache = ExtraCache()
- self.assertEqual(
- cache.get_filters("name", remove_filter=True),
- [{"op": "IN", "col": "name", "val": ["foo", "bar"]}],
- )
- self.assertEqual(cache.removed_filters, ["name"])
- self.assertEqual(cache.applied_filters, ["name"])
-
- def test_filter_values_extra_filters(self) -> None:
- with app.test_request_context(
- data={
- "form_data": json.dumps(
- {"extra_filters": [{"col": "name", "op": "in", "val": "foo"}]}
- )
- }
- ):
- cache = ExtraCache()
- self.assertEqual(cache.filter_values("name"), ["foo"])
- self.assertEqual(cache.applied_filters, ["name"])
-
- def test_url_param_default(self) -> None:
- with app.test_request_context():
- cache = ExtraCache()
- self.assertEqual(cache.url_param("foo", "bar"), "bar")
-
- def test_url_param_no_default(self) -> None:
- with app.test_request_context():
- cache = ExtraCache()
- self.assertEqual(cache.url_param("foo"), None)
-
- def test_url_param_query(self) -> None:
- with app.test_request_context(query_string={"foo": "bar"}):
- cache = ExtraCache()
- self.assertEqual(cache.url_param("foo"), "bar")
-
- def test_url_param_form_data(self) -> None:
- with app.test_request_context(
- query_string={"form_data": json.dumps({"url_params": {"foo": "bar"}})}
- ):
- cache = ExtraCache()
- self.assertEqual(cache.url_param("foo"), "bar")
-
- def test_url_param_escaped_form_data(self) -> None:
- with app.test_request_context(
- query_string={"form_data": json.dumps({"url_params": {"foo": "O'Brien"}})}
- ):
- cache = ExtraCache(dialect=dialect())
- self.assertEqual(cache.url_param("foo"), "O''Brien")
-
- def test_url_param_escaped_default_form_data(self) -> None:
- with app.test_request_context(
- query_string={"form_data": json.dumps({"url_params": {"foo": "O'Brien"}})}
- ):
- cache = ExtraCache(dialect=dialect())
- self.assertEqual(cache.url_param("bar", "O'Malley"), "O''Malley")
-
- def test_url_param_unescaped_form_data(self) -> None:
- with app.test_request_context(
- query_string={"form_data": json.dumps({"url_params": {"foo": "O'Brien"}})}
- ):
- cache = ExtraCache(dialect=dialect())
- self.assertEqual(cache.url_param("foo", escape_result=False), "O'Brien")
-
- def test_url_param_unescaped_default_form_data(self) -> None:
- with app.test_request_context(
- query_string={"form_data": json.dumps({"url_params": {"foo": "O'Brien"}})}
- ):
- cache = ExtraCache(dialect=dialect())
- self.assertEqual(
- cache.url_param("bar", "O'Malley", escape_result=False), "O'Malley"
- )
-
- def test_safe_proxy_primitive(self) -> None:
- def func(input: Any) -> Any:
- return input
-
- return_value = safe_proxy(func, "foo")
- self.assertEqual("foo", return_value)
-
- def test_safe_proxy_dict(self) -> None:
- def func(input: Any) -> Any:
- return input
-
- return_value = safe_proxy(func, {"foo": "bar"})
- self.assertEqual({"foo": "bar"}, return_value)
-
- def test_safe_proxy_lambda(self) -> None:
- def func(input: Any) -> Any:
- return input
-
- with pytest.raises(SupersetTemplateException):
- safe_proxy(func, lambda: "bar")
-
- def test_safe_proxy_nested_lambda(self) -> None:
- def func(input: Any) -> Any:
- return input
-
- with pytest.raises(SupersetTemplateException):
- safe_proxy(func, {"foo": lambda: "bar"})
-
- def test_process_template(self) -> None:
- maindb = superset.utils.database.get_example_database()
- sql = "SELECT '{{ 1+1 }}'"
- tp = get_template_processor(database=maindb)
- rendered = tp.process_template(sql)
- self.assertEqual("SELECT '2'", rendered)
-
- def test_get_template_kwarg(self) -> None:
- maindb = superset.utils.database.get_example_database()
- s = "{{ foo }}"
- tp = get_template_processor(database=maindb, foo="bar")
- rendered = tp.process_template(s)
- self.assertEqual("bar", rendered)
-
- def test_template_kwarg(self) -> None:
- maindb = superset.utils.database.get_example_database()
- s = "{{ foo }}"
- tp = get_template_processor(database=maindb)
- rendered = tp.process_template(s, foo="bar")
- self.assertEqual("bar", rendered)
-
- def test_get_template_kwarg_dict(self) -> None:
- maindb = superset.utils.database.get_example_database()
- s = "{{ foo.bar }}"
- tp = get_template_processor(database=maindb, foo={"bar": "baz"})
- rendered = tp.process_template(s)
- self.assertEqual("baz", rendered)
-
- def test_template_kwarg_dict(self) -> None:
- maindb = superset.utils.database.get_example_database()
- s = "{{ foo.bar }}"
- tp = get_template_processor(database=maindb)
- rendered = tp.process_template(s, foo={"bar": "baz"})
- self.assertEqual("baz", rendered)
-
- def test_get_template_kwarg_lambda(self) -> None:
- maindb = superset.utils.database.get_example_database()
- s = "{{ foo() }}"
- tp = get_template_processor(database=maindb, foo=lambda: "bar")
- with pytest.raises(SupersetTemplateException):
- tp.process_template(s)
-
- def test_template_kwarg_lambda(self) -> None:
- maindb = superset.utils.database.get_example_database()
- s = "{{ foo() }}"
- tp = get_template_processor(database=maindb)
- with pytest.raises(SupersetTemplateException):
- tp.process_template(s, foo=lambda: "bar")
-
- def test_get_template_kwarg_module(self) -> None:
- maindb = superset.utils.database.get_example_database()
- s = "{{ dt(2017, 1, 1).isoformat() }}"
- tp = get_template_processor(database=maindb, dt=datetime)
- with pytest.raises(SupersetTemplateException):
- tp.process_template(s)
-
- def test_template_kwarg_module(self) -> None:
- maindb = superset.utils.database.get_example_database()
- s = "{{ dt(2017, 1, 1).isoformat() }}"
- tp = get_template_processor(database=maindb)
- with pytest.raises(SupersetTemplateException):
- tp.process_template(s, dt=datetime)
-
- def test_get_template_kwarg_nested_module(self) -> None:
- maindb = superset.utils.database.get_example_database()
- s = "{{ foo.dt }}"
- tp = get_template_processor(database=maindb, foo={"dt": datetime})
- with pytest.raises(SupersetTemplateException):
- tp.process_template(s)
-
- def test_template_kwarg_nested_module(self) -> None:
- maindb = superset.utils.database.get_example_database()
- s = "{{ foo.dt }}"
- tp = get_template_processor(database=maindb)
- with pytest.raises(SupersetTemplateException):
- tp.process_template(s, foo={"bar": datetime})
-
- @mock.patch("superset.jinja_context.HiveTemplateProcessor.latest_partition")
- def test_template_hive(self, lp_mock) -> None:
- lp_mock.return_value = "the_latest"
- db = mock.Mock()
- db.backend = "hive"
- s = "{{ hive.latest_partition('my_table') }}"
- tp = get_template_processor(database=db)
- rendered = tp.process_template(s)
- self.assertEqual("the_latest", rendered)
-
- @mock.patch("superset.jinja_context.context_addons")
- def test_template_context_addons(self, addons_mock) -> None:
- addons_mock.return_value = {"datetime": datetime}
- maindb = superset.utils.database.get_example_database()
- s = "SELECT '{{ datetime(2017, 1, 1).isoformat() }}'"
- tp = get_template_processor(database=maindb)
- rendered = tp.process_template(s)
- self.assertEqual("SELECT '2017-01-01T00:00:00'", rendered)
-
- @mock.patch(
- "tests.integration_tests.superset_test_custom_template_processors.datetime"
- )
- def test_custom_process_template(self, mock_dt) -> None:
- """Test macro defined in custom template processor works."""
- mock_dt.utcnow = mock.Mock(return_value=datetime(1970, 1, 1))
- db = mock.Mock()
- db.backend = "db_for_macros_testing"
- tp = get_template_processor(database=db)
-
- sql = "SELECT '$DATE()'"
- rendered = tp.process_template(sql)
- self.assertEqual("SELECT '{}'".format("1970-01-01"), rendered)
-
- sql = "SELECT '$DATE(1, 2)'"
- rendered = tp.process_template(sql)
- self.assertEqual("SELECT '{}'".format("1970-01-02"), rendered)
-
- def test_custom_get_template_kwarg(self) -> None:
- """Test macro passed as kwargs when getting template processor
- works in custom template processor."""
- db = mock.Mock()
- db.backend = "db_for_macros_testing"
- s = "$foo()"
- tp = get_template_processor(database=db, foo=lambda: "bar")
- rendered = tp.process_template(s)
- self.assertEqual("bar", rendered)
-
- def test_custom_template_kwarg(self) -> None:
- """Test macro passed as kwargs when processing template
- works in custom template processor."""
- db = mock.Mock()
- db.backend = "db_for_macros_testing"
- s = "$foo()"
- tp = get_template_processor(database=db)
- rendered = tp.process_template(s, foo=lambda: "bar")
- self.assertEqual("bar", rendered)
-
- def test_custom_template_processors_overwrite(self) -> None:
- """Test template processor for presto gets overwritten by custom one."""
- db = mock.Mock()
- db.backend = "db_for_macros_testing"
- tp = get_template_processor(database=db)
-
- sql = "SELECT '{{ datetime(2017, 1, 1).isoformat() }}'"
- rendered = tp.process_template(sql)
- self.assertEqual(sql, rendered)
-
- sql = "SELECT '{{ DATE(1, 2) }}'"
- rendered = tp.process_template(sql)
- self.assertEqual(sql, rendered)
-
- def test_custom_template_processors_ignored(self) -> None:
- """Test custom template processor is ignored for a difference backend
- database."""
- maindb = superset.utils.database.get_example_database()
- sql = "SELECT '$DATE()'"
- tp = get_template_processor(database=maindb)
- rendered = tp.process_template(sql)
- assert sql == rendered
diff --git a/tests/integration_tests/test_jinja_context.py b/tests/integration_tests/test_jinja_context.py
new file mode 100644
index 0000000000..879881a299
--- /dev/null
+++ b/tests/integration_tests/test_jinja_context.py
@@ -0,0 +1,190 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from datetime import datetime
+from unittest import mock
+
+import pytest
+from flask.ctx import AppContext
+from pytest_mock import MockFixture
+
+import superset.utils.database
+from superset.exceptions import SupersetTemplateException
+from superset.jinja_context import get_template_processor
+
+
+def test_process_template(app_context: AppContext) -> None:
+ maindb = superset.utils.database.get_example_database()
+ template = "SELECT '{{ 1+1 }}'"
+ tp = get_template_processor(database=maindb)
+ assert tp.process_template(template) == "SELECT '2'"
+
+
+def test_get_template_kwarg(app_context: AppContext) -> None:
+ maindb = superset.utils.database.get_example_database()
+ template = "{{ foo }}"
+ tp = get_template_processor(database=maindb, foo="bar")
+ assert tp.process_template(template) == "bar"
+
+
+def test_template_kwarg(app_context: AppContext) -> None:
+ maindb = superset.utils.database.get_example_database()
+ template = "{{ foo }}"
+ tp = get_template_processor(database=maindb)
+ assert tp.process_template(template, foo="bar") == "bar"
+
+
+def test_get_template_kwarg_dict(app_context: AppContext) -> None:
+ maindb = superset.utils.database.get_example_database()
+ template = "{{ foo.bar }}"
+ tp = get_template_processor(database=maindb, foo={"bar": "baz"})
+ assert tp.process_template(template) == "baz"
+
+
+def test_template_kwarg_dict(app_context: AppContext) -> None:
+ maindb = superset.utils.database.get_example_database()
+ template = "{{ foo.bar }}"
+ tp = get_template_processor(database=maindb)
+ assert tp.process_template(template, foo={"bar": "baz"}) == "baz"
+
+
+def test_get_template_kwarg_lambda(app_context: AppContext) -> None:
+ maindb = superset.utils.database.get_example_database()
+ template = "{{ foo() }}"
+ tp = get_template_processor(database=maindb, foo=lambda: "bar")
+ with pytest.raises(SupersetTemplateException):
+ tp.process_template(template)
+
+
+def test_template_kwarg_lambda(app_context: AppContext) -> None:
+ maindb = superset.utils.database.get_example_database()
+ template = "{{ foo() }}"
+ tp = get_template_processor(database=maindb)
+ with pytest.raises(SupersetTemplateException):
+ tp.process_template(template, foo=lambda: "bar")
+
+
+def test_get_template_kwarg_module(app_context: AppContext) -> None:
+ maindb = superset.utils.database.get_example_database()
+ template = "{{ dt(2017, 1, 1).isoformat() }}"
+ tp = get_template_processor(database=maindb, dt=datetime)
+ with pytest.raises(SupersetTemplateException):
+ tp.process_template(template)
+
+
+def test_template_kwarg_module(app_context: AppContext) -> None:
+ maindb = superset.utils.database.get_example_database()
+ template = "{{ dt(2017, 1, 1).isoformat() }}"
+ tp = get_template_processor(database=maindb)
+ with pytest.raises(SupersetTemplateException):
+ tp.process_template(template, dt=datetime)
+
+
+def test_get_template_kwarg_nested_module(app_context: AppContext) -> None:
+ maindb = superset.utils.database.get_example_database()
+ template = "{{ foo.dt }}"
+ tp = get_template_processor(database=maindb, foo={"dt": datetime})
+ with pytest.raises(SupersetTemplateException):
+ tp.process_template(template)
+
+
+def test_template_kwarg_nested_module(app_context: AppContext) -> None:
+ maindb = superset.utils.database.get_example_database()
+ template = "{{ foo.dt }}"
+ tp = get_template_processor(database=maindb)
+ with pytest.raises(SupersetTemplateException):
+ tp.process_template(template, foo={"bar": datetime})
+
+
+def test_template_hive(app_context: AppContext, mocker: MockFixture) -> None:
+ lp_mock = mocker.patch(
+ "superset.jinja_context.HiveTemplateProcessor.latest_partition"
+ )
+ lp_mock.return_value = "the_latest"
+ db = mock.Mock()
+ db.backend = "hive"
+ template = "{{ hive.latest_partition('my_table') }}"
+ tp = get_template_processor(database=db)
+ assert tp.process_template(template) == "the_latest"
+
+
+def test_template_context_addons(app_context: AppContext, mocker: MockFixture) -> None:
+ addons_mock = mocker.patch("superset.jinja_context.context_addons")
+ addons_mock.return_value = {"datetime": datetime}
+ maindb = superset.utils.database.get_example_database()
+ template = "SELECT '{{ datetime(2017, 1, 1).isoformat() }}'"
+ tp = get_template_processor(database=maindb)
+ assert tp.process_template(template) == "SELECT '2017-01-01T00:00:00'"
+
+
+def test_custom_process_template(app_context: AppContext, mocker: MockFixture) -> None:
+ """Test macro defined in custom template processor works."""
+
+ mock_dt = mocker.patch(
+ "tests.integration_tests.superset_test_custom_template_processors.datetime"
+ )
+ mock_dt.utcnow = mock.Mock(return_value=datetime(1970, 1, 1))
+ db = mock.Mock()
+ db.backend = "db_for_macros_testing"
+ tp = get_template_processor(database=db)
+
+ template = "SELECT '$DATE()'"
+ assert tp.process_template(template) == f"SELECT '1970-01-01'"
+
+ template = "SELECT '$DATE(1, 2)'"
+ assert tp.process_template(template) == "SELECT '1970-01-02'"
+
+
+def test_custom_get_template_kwarg(app_context: AppContext) -> None:
+ """Test macro passed as kwargs when getting template processor
+ works in custom template processor."""
+ db = mock.Mock()
+ db.backend = "db_for_macros_testing"
+ template = "$foo()"
+ tp = get_template_processor(database=db, foo=lambda: "bar")
+ assert tp.process_template(template) == "bar"
+
+
+def test_custom_template_kwarg(app_context: AppContext) -> None:
+ """Test macro passed as kwargs when processing template
+ works in custom template processor."""
+ db = mock.Mock()
+ db.backend = "db_for_macros_testing"
+ template = "$foo()"
+ tp = get_template_processor(database=db)
+ assert tp.process_template(template, foo=lambda: "bar") == "bar"
+
+
+def test_custom_template_processors_overwrite(app_context: AppContext) -> None:
+ """Test template processor for presto gets overwritten by custom one."""
+ db = mock.Mock()
+ db.backend = "db_for_macros_testing"
+ tp = get_template_processor(database=db)
+
+ template = "SELECT '{{ datetime(2017, 1, 1).isoformat() }}'"
+ assert tp.process_template(template) == template
+
+ template = "SELECT '{{ DATE(1, 2) }}'"
+ assert tp.process_template(template) == template
+
+
+def test_custom_template_processors_ignored(app_context: AppContext) -> None:
+ """Test custom template processor is ignored for a difference backend
+ database."""
+ maindb = superset.utils.database.get_example_database()
+ template = "SELECT '$DATE()'"
+ tp = get_template_processor(database=maindb)
+ assert tp.process_template(template) == template
diff --git a/tests/unit_tests/test_jinja_context.py b/tests/unit_tests/test_jinja_context.py
new file mode 100644
index 0000000000..7c301c88ea
--- /dev/null
+++ b/tests/unit_tests/test_jinja_context.py
@@ -0,0 +1,268 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Any
+
+import pytest
+from flask.ctx import AppContext
+from sqlalchemy.dialects.postgresql import dialect
+
+from superset import app
+from superset.exceptions import SupersetTemplateException
+from superset.jinja_context import ExtraCache, safe_proxy
+
+
+def test_filter_values_default(app_context: AppContext) -> None:
+ cache = ExtraCache()
+ assert cache.filter_values("name", "foo") == ["foo"]
+ assert cache.removed_filters == []
+
+
+def test_filter_values_remove_not_present(app_context: AppContext) -> None:
+ cache = ExtraCache()
+ assert cache.filter_values("name", remove_filter=True) == []
+ assert cache.removed_filters == []
+
+
+def test_get_filters_remove_not_present(app_context: AppContext) -> None:
+ cache = ExtraCache()
+ assert cache.get_filters("name", remove_filter=True) == []
+ assert cache.removed_filters == []
+
+
+def test_filter_values_no_default(app_context: AppContext) -> None:
+ cache = ExtraCache()
+ assert cache.filter_values("name") == []
+
+
+def test_filter_values_adhoc_filters(app_context: AppContext) -> None:
+ with app.test_request_context(
+ data={
+ "form_data": json.dumps(
+ {
+ "adhoc_filters": [
+ {
+ "clause": "WHERE",
+ "comparator": "foo",
+ "expressionType": "SIMPLE",
+ "operator": "in",
+ "subject": "name",
+ }
+ ],
+ }
+ )
+ }
+ ):
+ cache = ExtraCache()
+ assert cache.filter_values("name") == ["foo"]
+ assert cache.applied_filters == ["name"]
+
+ with app.test_request_context(
+ data={
+ "form_data": json.dumps(
+ {
+ "adhoc_filters": [
+ {
+ "clause": "WHERE",
+ "comparator": ["foo", "bar"],
+ "expressionType": "SIMPLE",
+ "operator": "in",
+ "subject": "name",
+ }
+ ],
+ }
+ )
+ }
+ ):
+ cache = ExtraCache()
+ assert cache.filter_values("name") == ["foo", "bar"]
+ assert cache.applied_filters == ["name"]
+
+
+def test_get_filters_adhoc_filters(app_context: AppContext) -> None:
+ with app.test_request_context(
+ data={
+ "form_data": json.dumps(
+ {
+ "adhoc_filters": [
+ {
+ "clause": "WHERE",
+ "comparator": "foo",
+ "expressionType": "SIMPLE",
+ "operator": "in",
+ "subject": "name",
+ }
+ ],
+ }
+ )
+ }
+ ):
+ cache = ExtraCache()
+ assert cache.get_filters("name") == [
+ {"op": "IN", "col": "name", "val": ["foo"]}
+ ]
+
+ assert cache.removed_filters == []
+ assert cache.applied_filters == ["name"]
+
+ with app.test_request_context(
+ data={
+ "form_data": json.dumps(
+ {
+ "adhoc_filters": [
+ {
+ "clause": "WHERE",
+ "comparator": ["foo", "bar"],
+ "expressionType": "SIMPLE",
+ "operator": "in",
+ "subject": "name",
+ }
+ ],
+ }
+ )
+ }
+ ):
+ cache = ExtraCache()
+ assert cache.get_filters("name") == [
+ {"op": "IN", "col": "name", "val": ["foo", "bar"]}
+ ]
+ assert cache.removed_filters == []
+
+ with app.test_request_context(
+ data={
+ "form_data": json.dumps(
+ {
+ "adhoc_filters": [
+ {
+ "clause": "WHERE",
+ "comparator": ["foo", "bar"],
+ "expressionType": "SIMPLE",
+ "operator": "in",
+ "subject": "name",
+ }
+ ],
+ }
+ )
+ }
+ ):
+ cache = ExtraCache()
+ assert cache.get_filters("name", remove_filter=True) == [
+ {"op": "IN", "col": "name", "val": ["foo", "bar"]}
+ ]
+ assert cache.removed_filters == ["name"]
+ assert cache.applied_filters == ["name"]
+
+
+def test_filter_values_extra_filters(app_context: AppContext) -> None:
+ with app.test_request_context(
+ data={
+ "form_data": json.dumps(
+ {"extra_filters": [{"col": "name", "op": "in", "val": "foo"}]}
+ )
+ }
+ ):
+ cache = ExtraCache()
+ assert cache.filter_values("name") == ["foo"]
+ assert cache.applied_filters == ["name"]
+
+
+def test_url_param_default(app_context: AppContext) -> None:
+ with app.test_request_context():
+ cache = ExtraCache()
+ assert cache.url_param("foo", "bar") == "bar"
+
+
+def test_url_param_no_default(app_context: AppContext) -> None:
+ with app.test_request_context():
+ cache = ExtraCache()
+ assert cache.url_param("foo") is None
+
+
+def test_url_param_query(app_context: AppContext) -> None:
+ with app.test_request_context(query_string={"foo": "bar"}):
+ cache = ExtraCache()
+ assert cache.url_param("foo") == "bar"
+
+
+def test_url_param_form_data(app_context: AppContext) -> None:
+ with app.test_request_context(
+ query_string={"form_data": json.dumps({"url_params": {"foo": "bar"}})}
+ ):
+ cache = ExtraCache()
+ assert cache.url_param("foo") == "bar"
+
+
+def test_url_param_escaped_form_data(app_context: AppContext) -> None:
+ with app.test_request_context(
+ query_string={"form_data": json.dumps({"url_params": {"foo": "O'Brien"}})}
+ ):
+ cache = ExtraCache(dialect=dialect())
+ assert cache.url_param("foo") == "O''Brien"
+
+
+def test_url_param_escaped_default_form_data(app_context: AppContext) -> None:
+ with app.test_request_context(
+ query_string={"form_data": json.dumps({"url_params": {"foo": "O'Brien"}})}
+ ):
+ cache = ExtraCache(dialect=dialect())
+ assert cache.url_param("bar", "O'Malley") == "O''Malley"
+
+
+def test_url_param_unescaped_form_data(app_context: AppContext) -> None:
+ with app.test_request_context(
+ query_string={"form_data": json.dumps({"url_params": {"foo": "O'Brien"}})}
+ ):
+ cache = ExtraCache(dialect=dialect())
+ assert cache.url_param("foo", escape_result=False) == "O'Brien"
+
+
+def test_url_param_unescaped_default_form_data(app_context: AppContext) -> None:
+ with app.test_request_context(
+ query_string={"form_data": json.dumps({"url_params": {"foo": "O'Brien"}})}
+ ):
+ cache = ExtraCache(dialect=dialect())
+ assert cache.url_param("bar", "O'Malley", escape_result=False) == "O'Malley"
+
+
+def test_safe_proxy_primitive(app_context: AppContext) -> None:
+ def func(input_: Any) -> Any:
+ return input_
+
+ assert safe_proxy(func, "foo") == "foo"
+
+
+def test_safe_proxy_dict(app_context: AppContext) -> None:
+ def func(input_: Any) -> Any:
+ return input_
+
+ assert safe_proxy(func, {"foo": "bar"}) == {"foo": "bar"}
+
+
+def test_safe_proxy_lambda(app_context: AppContext) -> None:
+ def func(input_: Any) -> Any:
+ return input_
+
+ with pytest.raises(SupersetTemplateException):
+ safe_proxy(func, lambda: "bar")
+
+
+def test_safe_proxy_nested_lambda(app_context: AppContext) -> None:
+ def func(input_: Any) -> Any:
+ return input_
+
+ with pytest.raises(SupersetTemplateException):
+ safe_proxy(func, {"foo": lambda: "bar"})