You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by el...@apache.org on 2023/06/13 23:39:13 UTC

[superset] 02/07: fix: check sqlalchemy_uri (#23901)

This is an automated email from the ASF dual-hosted git repository.

elizabeth pushed a commit to branch elizabeth/test-2.1.1
in repository https://gitbox.apache.org/repos/asf/superset.git

commit 831978f0f749d04cb786ec963678792dd6870d25
Author: Daniel Vaz Gaspar <da...@gmail.com>
AuthorDate: Wed May 3 11:14:03 2023 +0100

    fix: check sqlalchemy_uri (#23901)
    
    (cherry picked from commit e5f512e348bb335816e2ceff4680167f477158de)
---
 superset/security/analytics_db_safety.py           | 14 ++--
 .../security/analytics_db_safety_tests.py          | 84 +++++++++++++++++-----
 2 files changed, 75 insertions(+), 23 deletions(-)

diff --git a/superset/security/analytics_db_safety.py b/superset/security/analytics_db_safety.py
index bfa93613e1..29583b5255 100644
--- a/superset/security/analytics_db_safety.py
+++ b/superset/security/analytics_db_safety.py
@@ -14,6 +14,8 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import re
+
 from flask_babel import lazy_gettext as _
 from sqlalchemy.engine.url import URL
 from sqlalchemy.exc import NoSuchModuleError
@@ -24,18 +26,20 @@ from superset.exceptions import SupersetSecurityException
 # list of unsafe SQLAlchemy dialects
 BLOCKLIST = {
     # sqlite creates a local DB, which allows mapping server's filesystem
-    "sqlite",
+    re.compile(r"sqlite(?:\+[^\s]*)?$"),
     # shillelagh allows opening local files (eg, 'SELECT * FROM "csv:///etc/passwd"')
-    "shillelagh",
-    "shillelagh+apsw",
+    re.compile(r"shillelagh$"),
+    re.compile(r"shillelagh\+apsw$"),
 }
 
 
 def check_sqlalchemy_uri(uri: URL) -> None:
-    if uri.drivername in BLOCKLIST:
+    for blocklist_regex in BLOCKLIST:
+        if not re.match(blocklist_regex, uri.drivername):
+            continue
         try:
             dialect = uri.get_dialect().__name__
-        except NoSuchModuleError:
+        except (NoSuchModuleError, ValueError):
             dialect = uri.drivername
 
         raise SupersetSecurityException(
diff --git a/tests/integration_tests/security/analytics_db_safety_tests.py b/tests/integration_tests/security/analytics_db_safety_tests.py
index f6518fe935..7e36268e30 100644
--- a/tests/integration_tests/security/analytics_db_safety_tests.py
+++ b/tests/integration_tests/security/analytics_db_safety_tests.py
@@ -14,30 +14,78 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from typing import Optional
 
 import pytest
 from sqlalchemy.engine.url import make_url
 
 from superset.exceptions import SupersetSecurityException
 from superset.security.analytics_db_safety import check_sqlalchemy_uri
-from tests.integration_tests.base_tests import SupersetTestCase
 
 
-class TestDBConnections(SupersetTestCase):
-    def test_check_sqlalchemy_uri_ok(self):
-        check_sqlalchemy_uri(make_url("postgres://user:password@test.com"))
-
-    def test_check_sqlalchemy_url_sqlite(self):
-        with pytest.raises(SupersetSecurityException) as excinfo:
-            check_sqlalchemy_uri(make_url("sqlite:///home/superset/bad.db"))
-        assert (
-            str(excinfo.value)
-            == "SQLiteDialect_pysqlite cannot be used as a data source for security reasons."
-        )
-
+@pytest.mark.parametrize(
+    "sqlalchemy_uri, error, error_message",
+    [
+        ("postgres://user:password@test.com", False, None),
+        (
+            "sqlite:///home/superset/bad.db",
+            True,
+            "SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
+        ),
+        (
+            "sqlite+pysqlite:///home/superset/bad.db",
+            True,
+            "SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
+        ),
+        (
+            "sqlite+aiosqlite:///home/superset/bad.db",
+            True,
+            "SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
+        ),
+        (
+            "sqlite+pysqlcipher:///home/superset/bad.db",
+            True,
+            "SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
+        ),
+        (
+            "sqlite+:///home/superset/bad.db",
+            True,
+            "SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
+        ),
+        (
+            "sqlite+new+driver:///home/superset/bad.db",
+            True,
+            "SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
+        ),
+        (
+            "sqlite+new+:///home/superset/bad.db",
+            True,
+            "SQLiteDialect_pysqlite cannot be used as a data source for security reasons.",
+        ),
+        (
+            "shillelagh:///home/superset/bad.db",
+            True,
+            "shillelagh cannot be used as a data source for security reasons.",
+        ),
+        (
+            "shillelagh+apsw:///home/superset/bad.db",
+            True,
+            "shillelagh cannot be used as a data source for security reasons.",
+        ),
+        ("shillelagh+:///home/superset/bad.db", False, None),
+        (
+            "shillelagh+something:///home/superset/bad.db",
+            False,
+            None,
+        ),
+    ],
+)
+def test_check_sqlalchemy_uri(
+    sqlalchemy_uri: str, error: bool, error_message: Optional[str]
+):
+    if error:
         with pytest.raises(SupersetSecurityException) as excinfo:
-            check_sqlalchemy_uri(make_url("shillelagh:///home/superset/bad.db"))
-        assert (
-            str(excinfo.value)
-            == "shillelagh cannot be used as a data source for security reasons."
-        )
+            check_sqlalchemy_uri(make_url(sqlalchemy_uri))
+            assert str(excinfo.value) == error_message
+    else:
+        check_sqlalchemy_uri(make_url(sqlalchemy_uri))