You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by el...@apache.org on 2023/06/13 23:39:14 UTC

[superset] 03/07: fix: permission checks on import (#23200)

This is an automated email from the ASF dual-hosted git repository.

elizabeth pushed a commit to branch elizabeth/test-2.1.1
in repository https://gitbox.apache.org/repos/asf/superset.git

commit cfc2ca672e7cea0e461b0b37be5174f88e6468ca
Author: Beto Dealmeida <ro...@dealmeida.net>
AuthorDate: Wed Mar 15 08:31:09 2023 -0700

    fix: permission checks on import (#23200)
---
 superset/charts/commands/importers/v1/utils.py     |   9 +-
 superset/commands/importers/v1/examples.py         |  21 +-
 superset/dashboards/commands/importers/v1/utils.py |  10 +-
 superset/databases/commands/importers/v1/utils.py  |  13 +-
 superset/datasets/commands/importers/v1/utils.py   |   9 +-
 superset/examples/utils.py                         |   4 +-
 tests/integration_tests/charts/commands_tests.py   |  11 +-
 .../integration_tests/dashboards/commands_tests.py |  12 +-
 .../integration_tests/databases/commands_tests.py  | 310 +++++++++++++++++----
 tests/integration_tests/datasets/commands_tests.py |  17 +-
 tests/integration_tests/fixtures/importexport.py   | 105 +++++++
 .../queries/saved_queries/commands_tests.py        |  10 +-
 .../charts/commands/importers/v1/import_test.py    |  45 ++-
 .../commands/importers/v1/assets_test.py           |  16 +-
 .../commands/importers/v1/import_test.py           |  46 ++-
 .../databases/commands/importers/v1/import_test.py |  44 ++-
 .../datasets/commands/importers/v1/import_test.py  |  38 ++-
 17 files changed, 621 insertions(+), 99 deletions(-)

diff --git a/superset/charts/commands/importers/v1/utils.py b/superset/charts/commands/importers/v1/utils.py
index d4c69d3ada..cea8af7b4c 100644
--- a/superset/charts/commands/importers/v1/utils.py
+++ b/superset/charts/commands/importers/v1/utils.py
@@ -21,17 +21,24 @@ from typing import Any, Dict
 from flask import g
 from sqlalchemy.orm import Session
 
+from superset import security_manager
+from superset.commands.exceptions import ImportFailedError
 from superset.models.slice import Slice
 
 
 def import_chart(
     session: Session, config: Dict[str, Any], overwrite: bool = False
 ) -> Slice:
+    can_write = security_manager.can_access("can_write", "Chart")
     existing = session.query(Slice).filter_by(uuid=config["uuid"]).first()
     if existing:
-        if not overwrite:
+        if not overwrite or not can_write:
             return existing
         config["id"] = existing.id
+    elif not can_write:
+        raise ImportFailedError(
+            "Chart doesn't exist and user doesn't have permission to create charts"
+        )
 
     # TODO (betodealmeida): move this logic to import_from_dict
     config["params"] = json.dumps(config["params"])
diff --git a/superset/commands/importers/v1/examples.py b/superset/commands/importers/v1/examples.py
index 99aa831faa..8d14c8298a 100644
--- a/superset/commands/importers/v1/examples.py
+++ b/superset/commands/importers/v1/examples.py
@@ -21,7 +21,7 @@ from sqlalchemy.orm import Session
 from sqlalchemy.orm.exc import MultipleResultsFound
 from sqlalchemy.sql import select
 
-from superset import db
+from superset import db, security_manager
 from superset.charts.commands.importers.v1 import ImportChartsCommand
 from superset.charts.commands.importers.v1.utils import import_chart
 from superset.charts.schemas import ImportV1ChartSchema
@@ -42,7 +42,7 @@ from superset.datasets.commands.importers.v1 import ImportDatasetsCommand
 from superset.datasets.commands.importers.v1.utils import import_dataset
 from superset.datasets.schemas import ImportV1DatasetSchema
 from superset.models.dashboard import dashboard_slices
-from superset.utils.core import get_example_default_schema
+from superset.utils.core import get_example_default_schema, override_user
 from superset.utils.database import get_example_database
 
 
@@ -69,7 +69,13 @@ class ImportExamplesCommand(ImportModelsCommand):
 
         # rollback to prevent partial imports
         try:
-            self._import(db.session, self._configs, self.overwrite, self.force_data)
+            with override_user(security_manager.find_user(username="admin")):
+                self._import(
+                    db.session,
+                    self._configs,
+                    self.overwrite,
+                    self.force_data,
+                )
             db.session.commit()
         except Exception as ex:
             db.session.rollback()
@@ -119,13 +125,12 @@ class ImportExamplesCommand(ImportModelsCommand):
                 if config["schema"] is None:
                     config["schema"] = get_example_default_schema()
 
-                dataset = import_dataset(
-                    session, config, overwrite=overwrite, force_data=force_data
-                )
-
                 try:
                     dataset = import_dataset(
-                        session, config, overwrite=overwrite, force_data=force_data
+                        session,
+                        config,
+                        overwrite=overwrite,
+                        force_data=force_data,
                     )
                 except MultipleResultsFound:
                     # Multiple result can be found for datasets. There was a bug in
diff --git a/superset/dashboards/commands/importers/v1/utils.py b/superset/dashboards/commands/importers/v1/utils.py
index 513d1efcdb..30bad57cb1 100644
--- a/superset/dashboards/commands/importers/v1/utils.py
+++ b/superset/dashboards/commands/importers/v1/utils.py
@@ -22,6 +22,8 @@ from typing import Any, Dict, Set
 from flask import g
 from sqlalchemy.orm import Session
 
+from superset import security_manager
+from superset.commands.exceptions import ImportFailedError
 from superset.models.dashboard import Dashboard
 
 logger = logging.getLogger(__name__)
@@ -146,11 +148,17 @@ def update_id_refs(  # pylint: disable=too-many-locals
 def import_dashboard(
     session: Session, config: Dict[str, Any], overwrite: bool = False
 ) -> Dashboard:
+    can_write = security_manager.can_access("can_write", "Dashboard")
     existing = session.query(Dashboard).filter_by(uuid=config["uuid"]).first()
     if existing:
-        if not overwrite:
+        if not overwrite or not can_write:
             return existing
         config["id"] = existing.id
+    elif not can_write:
+        raise ImportFailedError(
+            "Dashboard doesn't exist and user doesn't "
+            "have permission to create dashboards"
+        )
 
     # TODO (betodealmeida): move this logic to import_from_dict
     config = config.copy()
diff --git a/superset/databases/commands/importers/v1/utils.py b/superset/databases/commands/importers/v1/utils.py
index 6704ccd465..673e4f86df 100644
--- a/superset/databases/commands/importers/v1/utils.py
+++ b/superset/databases/commands/importers/v1/utils.py
@@ -20,17 +20,26 @@ from typing import Any, Dict
 
 from sqlalchemy.orm import Session
 
+from superset import security_manager
+from superset.commands.exceptions import ImportFailedError
 from superset.models.core import Database
 
 
 def import_database(
-    session: Session, config: Dict[str, Any], overwrite: bool = False
+    session: Session,
+    config: Dict[str, Any],
+    overwrite: bool = False,
 ) -> Database:
+    can_write = security_manager.can_access("can_write", "Database")
     existing = session.query(Database).filter_by(uuid=config["uuid"]).first()
     if existing:
-        if not overwrite:
+        if not overwrite or not can_write:
             return existing
         config["id"] = existing.id
+    elif not can_write:
+        raise ImportFailedError(
+            "Database doesn't exist and user doesn't have permission to create databases"
+        )
 
     # https://github.com/apache/superset/pull/16756 renamed ``csv`` to ``file``.
     config["allow_file_upload"] = config.pop("allow_csv_upload")
diff --git a/superset/datasets/commands/importers/v1/utils.py b/superset/datasets/commands/importers/v1/utils.py
index b3fb2a8041..2363cf7497 100644
--- a/superset/datasets/commands/importers/v1/utils.py
+++ b/superset/datasets/commands/importers/v1/utils.py
@@ -28,6 +28,8 @@ from sqlalchemy.orm import Session
 from sqlalchemy.orm.exc import MultipleResultsFound
 from sqlalchemy.sql.visitors import VisitableType
 
+from superset import security_manager
+from superset.commands.exceptions import ImportFailedError
 from superset.connectors.sqla.models import SqlaTable
 from superset.datasets.commands.exceptions import DatasetForbiddenDataURI
 from superset.models.core import Database
@@ -104,11 +106,16 @@ def import_dataset(
     overwrite: bool = False,
     force_data: bool = False,
 ) -> SqlaTable:
+    can_write = security_manager.can_access("can_write", "Dataset")
     existing = session.query(SqlaTable).filter_by(uuid=config["uuid"]).first()
     if existing:
-        if not overwrite:
+        if not overwrite or not can_write:
             return existing
         config["id"] = existing.id
+    elif not can_write:
+        raise ImportFailedError(
+            "Dataset doesn't exist and user doesn't have permission to create datasets"
+        )
 
     # TODO (betodealmeida): move this logic to import_from_dict
     config = config.copy()
diff --git a/superset/examples/utils.py b/superset/examples/utils.py
index 4568e4f196..aea1f0f93d 100644
--- a/superset/examples/utils.py
+++ b/superset/examples/utils.py
@@ -92,7 +92,9 @@ def load_configs_from_directory(
     contents[METADATA_FILE_NAME] = yaml.dump(metadata)
 
     command = ImportExamplesCommand(
-        contents, overwrite=overwrite, force_data=force_data
+        contents,
+        overwrite=overwrite,
+        force_data=force_data,
     )
     try:
         command.run()
diff --git a/tests/integration_tests/charts/commands_tests.py b/tests/integration_tests/charts/commands_tests.py
index da9a7550ac..fb2073499b 100644
--- a/tests/integration_tests/charts/commands_tests.py
+++ b/tests/integration_tests/charts/commands_tests.py
@@ -164,9 +164,10 @@ class TestExportChartsCommand(SupersetTestCase):
 
 class TestImportChartsCommand(SupersetTestCase):
     @patch("superset.charts.commands.importers.v1.utils.g")
-    def test_import_v1_chart(self, mock_g):
+    @patch("superset.security.manager.g")
+    def test_import_v1_chart(self, sm_g, utils_g):
         """Test that we can import a chart"""
-        mock_g.user = security_manager.find_user("admin")
+        admin = sm_g.user = utils_g.user = security_manager.find_user("admin")
         contents = {
             "metadata.yaml": yaml.safe_dump(chart_metadata_config),
             "databases/imported_database.yaml": yaml.safe_dump(database_config),
@@ -227,7 +228,7 @@ class TestImportChartsCommand(SupersetTestCase):
         assert database.database_name == "imported_database"
         assert chart.table.database == database
 
-        assert chart.owners == [mock_g.user]
+        assert chart.owners == [admin]
 
         chart.owners = []
         dataset.owners = []
@@ -237,8 +238,10 @@ class TestImportChartsCommand(SupersetTestCase):
         db.session.delete(database)
         db.session.commit()
 
-    def test_import_v1_chart_multiple(self):
+    @patch("superset.security.manager.g")
+    def test_import_v1_chart_multiple(self, sm_g):
         """Test that a chart can be imported multiple times"""
+        sm_g.user = security_manager.find_user("admin")
         contents = {
             "metadata.yaml": yaml.safe_dump(chart_metadata_config),
             "databases/imported_database.yaml": yaml.safe_dump(database_config),
diff --git a/tests/integration_tests/dashboards/commands_tests.py b/tests/integration_tests/dashboards/commands_tests.py
index 927865e4a0..ad9152585e 100644
--- a/tests/integration_tests/dashboards/commands_tests.py
+++ b/tests/integration_tests/dashboards/commands_tests.py
@@ -485,9 +485,10 @@ class TestImportDashboardsCommand(SupersetTestCase):
         db.session.commit()
 
     @patch("superset.dashboards.commands.importers.v1.utils.g")
-    def test_import_v1_dashboard(self, mock_g):
+    @patch("superset.security.manager.g")
+    def test_import_v1_dashboard(self, sm_g, utils_g):
         """Test that we can import a dashboard"""
-        mock_g.user = security_manager.find_user("admin")
+        admin = sm_g.user = utils_g.user = security_manager.find_user("admin")
         contents = {
             "metadata.yaml": yaml.safe_dump(dashboard_metadata_config),
             "databases/imported_database.yaml": yaml.safe_dump(database_config),
@@ -570,7 +571,7 @@ class TestImportDashboardsCommand(SupersetTestCase):
         database = dataset.database
         assert str(database.uuid) == database_config["uuid"]
 
-        assert dashboard.owners == [mock_g.user]
+        assert dashboard.owners == [admin]
 
         dashboard.owners = []
         chart.owners = []
@@ -582,8 +583,11 @@ class TestImportDashboardsCommand(SupersetTestCase):
         db.session.delete(database)
         db.session.commit()
 
-    def test_import_v1_dashboard_multiple(self):
+    @patch("superset.security.manager.g")
+    def test_import_v1_dashboard_multiple(self, mock_g):
         """Test that a dashboard can be imported multiple times"""
+        mock_g.user = security_manager.find_user("admin")
+
         num_dashboards = db.session.query(Dashboard).count()
 
         contents = {
diff --git a/tests/integration_tests/databases/commands_tests.py b/tests/integration_tests/databases/commands_tests.py
index 7e4fcaad78..7b2d4fdd27 100644
--- a/tests/integration_tests/databases/commands_tests.py
+++ b/tests/integration_tests/databases/commands_tests.py
@@ -40,7 +40,7 @@ from superset.databases.commands.importers.v1 import ImportDatabasesCommand
 from superset.databases.commands.tables import TablesDatabaseCommand
 from superset.databases.commands.test_connection import TestConnectionDatabaseCommand
 from superset.databases.commands.validate import ValidateDatabaseParametersCommand
-from superset.databases.schemas import DatabaseTestConnectionSchema
+from superset.databases.ssh_tunnel.models import SSHTunnel
 from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
 from superset.exceptions import (
     SupersetErrorsException,
@@ -63,16 +63,19 @@ from tests.integration_tests.fixtures.energy_dashboard import (
 from tests.integration_tests.fixtures.importexport import (
     database_config,
     database_metadata_config,
+    database_with_ssh_tunnel_config_mix_credentials,
+    database_with_ssh_tunnel_config_no_credentials,
+    database_with_ssh_tunnel_config_password,
+    database_with_ssh_tunnel_config_private_key,
+    database_with_ssh_tunnel_config_private_pass_only,
     dataset_config,
     dataset_metadata_config,
 )
 
 
 class TestCreateDatabaseCommand(SupersetTestCase):
-    @mock.patch(
-        "superset.databases.commands.test_connection.event_logger.log_with_context"
-    )
-    @mock.patch("superset.utils.core.g")
+    @patch("superset.databases.commands.test_connection.event_logger.log_with_context")
+    @patch("superset.utils.core.g")
     def test_create_duplicate_error(self, mock_g, mock_logger):
         example_db = get_example_database()
         mock_g.user = security_manager.find_user("admin")
@@ -90,10 +93,8 @@ class TestCreateDatabaseCommand(SupersetTestCase):
             "DatabaseRequiredFieldValidationError"
         )
 
-    @mock.patch(
-        "superset.databases.commands.test_connection.event_logger.log_with_context"
-    )
-    @mock.patch("superset.utils.core.g")
+    @patch("superset.databases.commands.test_connection.event_logger.log_with_context")
+    @patch("superset.utils.core.g")
     def test_multiple_error_logging(self, mock_g, mock_logger):
         mock_g.user = security_manager.find_user("admin")
         command = CreateDatabaseCommand({})
@@ -395,8 +396,11 @@ class TestExportDatabasesCommand(SupersetTestCase):
 
 
 class TestImportDatabasesCommand(SupersetTestCase):
-    def test_import_v1_database(self):
+    @patch("superset.security.manager.g")
+    def test_import_v1_database(self, mock_g):
         """Test that a database can be imported"""
+        mock_g.user = security_manager.find_user("admin")
+
         contents = {
             "metadata.yaml": yaml.safe_dump(database_metadata_config),
             "databases/imported_database.yaml": yaml.safe_dump(database_config),
@@ -421,7 +425,8 @@ class TestImportDatabasesCommand(SupersetTestCase):
         db.session.delete(database)
         db.session.commit()
 
-    def test_import_v1_database_broken_csv_fields(self):
+    @patch("superset.security.manager.g")
+    def test_import_v1_database_broken_csv_fields(self, mock_g):
         """
         Test that a database can be imported with broken schema.
 
@@ -429,6 +434,8 @@ class TestImportDatabasesCommand(SupersetTestCase):
         the V1 schema. This test ensures that we can import databases that were
         exported with the broken schema.
         """
+        mock_g.user = security_manager.find_user("admin")
+
         broken_config = database_config.copy()
         broken_config["allow_file_upload"] = broken_config.pop("allow_csv_upload")
         broken_config["extra"] = {"schemas_allowed_for_file_upload": ["upload"]}
@@ -457,8 +464,11 @@ class TestImportDatabasesCommand(SupersetTestCase):
         db.session.delete(database)
         db.session.commit()
 
-    def test_import_v1_database_multiple(self):
+    @patch("superset.security.manager.g")
+    def test_import_v1_database_multiple(self, mock_g):
         """Test that a database can be imported multiple times"""
+        mock_g.user = security_manager.find_user("admin")
+
         num_databases = db.session.query(Database).count()
 
         contents = {
@@ -498,8 +508,11 @@ class TestImportDatabasesCommand(SupersetTestCase):
         db.session.delete(database)
         db.session.commit()
 
-    def test_import_v1_database_with_dataset(self):
+    @patch("superset.security.manager.g")
+    def test_import_v1_database_with_dataset(self, mock_g):
         """Test that a database can be imported with datasets"""
+        mock_g.user = security_manager.find_user("admin")
+
         contents = {
             "databases/imported_database.yaml": yaml.safe_dump(database_config),
             "datasets/imported_dataset.yaml": yaml.safe_dump(dataset_config),
@@ -518,8 +531,11 @@ class TestImportDatabasesCommand(SupersetTestCase):
         db.session.delete(database)
         db.session.commit()
 
-    def test_import_v1_database_with_dataset_multiple(self):
+    @patch("superset.security.manager.g")
+    def test_import_v1_database_with_dataset_multiple(self, mock_g):
         """Test that a database can be imported multiple times w/o changing datasets"""
+        mock_g.user = security_manager.find_user("admin")
+
         contents = {
             "databases/imported_database.yaml": yaml.safe_dump(database_config),
             "datasets/imported_dataset.yaml": yaml.safe_dump(dataset_config),
@@ -623,6 +639,200 @@ class TestImportDatabasesCommand(SupersetTestCase):
             }
         }
 
+    @patch("superset.databases.schemas.is_feature_enabled")
+    def test_import_v1_database_masked_ssh_tunnel_password(
+        self, mock_schema_is_feature_enabled
+    ):
+        """Test that database imports with masked ssh_tunnel passwords are rejected"""
+        mock_schema_is_feature_enabled.return_value = True
+        masked_database_config = database_with_ssh_tunnel_config_password.copy()
+        contents = {
+            "metadata.yaml": yaml.safe_dump(database_metadata_config),
+            "databases/imported_database.yaml": yaml.safe_dump(masked_database_config),
+        }
+        command = ImportDatabasesCommand(contents)
+        with pytest.raises(CommandInvalidError) as excinfo:
+            command.run()
+        assert str(excinfo.value) == "Error importing database"
+        assert excinfo.value.normalized_messages() == {
+            "databases/imported_database.yaml": {
+                "_schema": ["Must provide a password for the ssh tunnel"]
+            }
+        }
+
+    @patch("superset.databases.schemas.is_feature_enabled")
+    def test_import_v1_database_masked_ssh_tunnel_private_key_and_password(
+        self, mock_schema_is_feature_enabled
+    ):
+        """Test that database imports with masked ssh_tunnel private_key and private_key_password are rejected"""
+        mock_schema_is_feature_enabled.return_value = True
+        masked_database_config = database_with_ssh_tunnel_config_private_key.copy()
+        contents = {
+            "metadata.yaml": yaml.safe_dump(database_metadata_config),
+            "databases/imported_database.yaml": yaml.safe_dump(masked_database_config),
+        }
+        command = ImportDatabasesCommand(contents)
+        with pytest.raises(CommandInvalidError) as excinfo:
+            command.run()
+        assert str(excinfo.value) == "Error importing database"
+        assert excinfo.value.normalized_messages() == {
+            "databases/imported_database.yaml": {
+                "_schema": [
+                    "Must provide a private key for the ssh tunnel",
+                    "Must provide a private key password for the ssh tunnel",
+                ]
+            }
+        }
+
+    @patch("superset.databases.schemas.is_feature_enabled")
+    @patch("superset.security.manager.g")
+    def test_import_v1_database_with_ssh_tunnel_password(
+        self,
+        mock_g,
+        mock_schema_is_feature_enabled,
+    ):
+        """Test that a database with ssh_tunnel password can be imported"""
+        mock_g.user = security_manager.find_user("admin")
+        mock_schema_is_feature_enabled.return_value = True
+        masked_database_config = database_with_ssh_tunnel_config_password.copy()
+        masked_database_config["ssh_tunnel"]["password"] = "TEST"
+        contents = {
+            "metadata.yaml": yaml.safe_dump(database_metadata_config),
+            "databases/imported_database.yaml": yaml.safe_dump(masked_database_config),
+        }
+        command = ImportDatabasesCommand(contents)
+        command.run()
+
+        database = (
+            db.session.query(Database).filter_by(uuid=database_config["uuid"]).one()
+        )
+        assert database.allow_file_upload
+        assert database.allow_ctas
+        assert database.allow_cvas
+        assert database.allow_dml
+        assert not database.allow_run_async
+        assert database.cache_timeout is None
+        assert database.database_name == "imported_database"
+        assert database.expose_in_sqllab
+        assert database.extra == "{}"
+        assert database.sqlalchemy_uri == "sqlite:///test.db"
+
+        model_ssh_tunnel = (
+            db.session.query(SSHTunnel)
+            .filter(SSHTunnel.database_id == database.id)
+            .one()
+        )
+        self.assertEqual(model_ssh_tunnel.password, "TEST")
+
+        db.session.delete(database)
+        db.session.commit()
+
+    @patch("superset.databases.schemas.is_feature_enabled")
+    @patch("superset.security.manager.g")
+    def test_import_v1_database_with_ssh_tunnel_private_key_and_password(
+        self,
+        mock_g,
+        mock_schema_is_feature_enabled,
+    ):
+        """Test that a database with ssh_tunnel private_key and private_key_password can be imported"""
+        mock_g.user = security_manager.find_user("admin")
+
+        mock_schema_is_feature_enabled.return_value = True
+        masked_database_config = database_with_ssh_tunnel_config_private_key.copy()
+        masked_database_config["ssh_tunnel"]["private_key"] = "TestPrivateKey"
+        masked_database_config["ssh_tunnel"]["private_key_password"] = "TEST"
+        contents = {
+            "metadata.yaml": yaml.safe_dump(database_metadata_config),
+            "databases/imported_database.yaml": yaml.safe_dump(masked_database_config),
+        }
+        command = ImportDatabasesCommand(contents)
+        command.run()
+
+        database = (
+            db.session.query(Database).filter_by(uuid=database_config["uuid"]).one()
+        )
+        assert database.allow_file_upload
+        assert database.allow_ctas
+        assert database.allow_cvas
+        assert database.allow_dml
+        assert not database.allow_run_async
+        assert database.cache_timeout is None
+        assert database.database_name == "imported_database"
+        assert database.expose_in_sqllab
+        assert database.extra == "{}"
+        assert database.sqlalchemy_uri == "sqlite:///test.db"
+
+        model_ssh_tunnel = (
+            db.session.query(SSHTunnel)
+            .filter(SSHTunnel.database_id == database.id)
+            .one()
+        )
+        self.assertEqual(model_ssh_tunnel.private_key, "TestPrivateKey")
+        self.assertEqual(model_ssh_tunnel.private_key_password, "TEST")
+
+        db.session.delete(database)
+        db.session.commit()
+
+    @patch("superset.databases.schemas.is_feature_enabled")
+    def test_import_v1_database_masked_ssh_tunnel_no_credentials(
+        self, mock_schema_is_feature_enabled
+    ):
+        """Test that databases with ssh_tunnels that have no credentials are rejected"""
+        mock_schema_is_feature_enabled.return_value = True
+        masked_database_config = database_with_ssh_tunnel_config_no_credentials.copy()
+        contents = {
+            "metadata.yaml": yaml.safe_dump(database_metadata_config),
+            "databases/imported_database.yaml": yaml.safe_dump(masked_database_config),
+        }
+        command = ImportDatabasesCommand(contents)
+        with pytest.raises(CommandInvalidError) as excinfo:
+            command.run()
+        assert str(excinfo.value) == "Must provide credentials for the SSH Tunnel"
+
+    @patch("superset.databases.schemas.is_feature_enabled")
+    def test_import_v1_database_masked_ssh_tunnel_multiple_credentials(
+        self, mock_schema_is_feature_enabled
+    ):
+        """Test that databases with ssh_tunnels that have multiple credentials are rejected"""
+        mock_schema_is_feature_enabled.return_value = True
+        masked_database_config = database_with_ssh_tunnel_config_mix_credentials.copy()
+        contents = {
+            "metadata.yaml": yaml.safe_dump(database_metadata_config),
+            "databases/imported_database.yaml": yaml.safe_dump(masked_database_config),
+        }
+        command = ImportDatabasesCommand(contents)
+        with pytest.raises(CommandInvalidError) as excinfo:
+            command.run()
+        assert (
+            str(excinfo.value) == "Cannot have multiple credentials for the SSH Tunnel"
+        )
+
+    @patch("superset.databases.schemas.is_feature_enabled")
+    def test_import_v1_database_masked_ssh_tunnel_only_priv_key_psswd(
+        self, mock_schema_is_feature_enabled
+    ):
+        """Test that databases with ssh_tunnels that have multiple credentials are rejected"""
+        mock_schema_is_feature_enabled.return_value = True
+        masked_database_config = (
+            database_with_ssh_tunnel_config_private_pass_only.copy()
+        )
+        contents = {
+            "metadata.yaml": yaml.safe_dump(database_metadata_config),
+            "databases/imported_database.yaml": yaml.safe_dump(masked_database_config),
+        }
+        command = ImportDatabasesCommand(contents)
+        with pytest.raises(CommandInvalidError) as excinfo:
+            command.run()
+        assert str(excinfo.value) == "Error importing database"
+        assert excinfo.value.normalized_messages() == {
+            "databases/imported_database.yaml": {
+                "_schema": [
+                    "Must provide a private key for the ssh tunnel",
+                    "Must provide a private key password for the ssh tunnel",
+                ]
+            }
+        }
+
     @patch("superset.databases.commands.importers.v1.import_dataset")
     def test_import_v1_rollback(self, mock_import_dataset):
         """Test than on an exception everything is rolled back"""
@@ -648,11 +858,9 @@ class TestImportDatabasesCommand(SupersetTestCase):
 
 
 class TestTestConnectionDatabaseCommand(SupersetTestCase):
-    @mock.patch("superset.databases.dao.Database._get_sqla_engine")
-    @mock.patch(
-        "superset.databases.commands.test_connection.event_logger.log_with_context"
-    )
-    @mock.patch("superset.utils.core.g")
+    @patch("superset.databases.dao.Database._get_sqla_engine")
+    @patch("superset.databases.commands.test_connection.event_logger.log_with_context")
+    @patch("superset.utils.core.g")
     def test_connection_db_exception(
         self, mock_g, mock_event_logger, mock_get_sqla_engine
     ):
@@ -671,11 +879,9 @@ class TestTestConnectionDatabaseCommand(SupersetTestCase):
             )
         mock_event_logger.assert_called()
 
-    @mock.patch("superset.databases.dao.Database._get_sqla_engine")
-    @mock.patch(
-        "superset.databases.commands.test_connection.event_logger.log_with_context"
-    )
-    @mock.patch("superset.utils.core.g")
+    @patch("superset.databases.dao.Database._get_sqla_engine")
+    @patch("superset.databases.commands.test_connection.event_logger.log_with_context")
+    @patch("superset.utils.core.g")
     def test_connection_do_ping_exception(
         self, mock_g, mock_event_logger, mock_get_sqla_engine
     ):
@@ -696,11 +902,9 @@ class TestTestConnectionDatabaseCommand(SupersetTestCase):
             == SupersetErrorType.GENERIC_DB_ENGINE_ERROR
         )
 
-    @mock.patch("superset.databases.commands.test_connection.func_timeout")
-    @mock.patch(
-        "superset.databases.commands.test_connection.event_logger.log_with_context"
-    )
-    @mock.patch("superset.utils.core.g")
+    @patch("superset.databases.commands.test_connection.func_timeout")
+    @patch("superset.databases.commands.test_connection.event_logger.log_with_context")
+    @patch("superset.utils.core.g")
     def test_connection_do_ping_timeout(
         self, mock_g, mock_event_logger, mock_func_timeout
     ):
@@ -720,11 +924,9 @@ class TestTestConnectionDatabaseCommand(SupersetTestCase):
             == SupersetErrorType.CONNECTION_DATABASE_TIMEOUT
         )
 
-    @mock.patch("superset.databases.dao.Database._get_sqla_engine")
-    @mock.patch(
-        "superset.databases.commands.test_connection.event_logger.log_with_context"
-    )
-    @mock.patch("superset.utils.core.g")
+    @patch("superset.databases.dao.Database._get_sqla_engine")
+    @patch("superset.databases.commands.test_connection.event_logger.log_with_context")
+    @patch("superset.utils.core.g")
     def test_connection_superset_security_connection(
         self, mock_g, mock_event_logger, mock_get_sqla_engine
     ):
@@ -745,11 +947,9 @@ class TestTestConnectionDatabaseCommand(SupersetTestCase):
 
         mock_event_logger.assert_called()
 
-    @mock.patch("superset.databases.dao.Database._get_sqla_engine")
-    @mock.patch(
-        "superset.databases.commands.test_connection.event_logger.log_with_context"
-    )
-    @mock.patch("superset.utils.core.g")
+    @patch("superset.databases.dao.Database._get_sqla_engine")
+    @patch("superset.databases.commands.test_connection.event_logger.log_with_context")
+    @patch("superset.utils.core.g")
     def test_connection_db_api_exc(
         self, mock_g, mock_event_logger, mock_get_sqla_engine
     ):
@@ -772,9 +972,9 @@ class TestTestConnectionDatabaseCommand(SupersetTestCase):
         mock_event_logger.assert_called()
 
 
-@mock.patch("superset.db_engine_specs.base.is_hostname_valid")
-@mock.patch("superset.db_engine_specs.base.is_port_open")
-@mock.patch("superset.databases.commands.validate.DatabaseDAO")
+@patch("superset.db_engine_specs.base.is_hostname_valid")
+@patch("superset.db_engine_specs.base.is_port_open")
+@patch("superset.databases.commands.validate.DatabaseDAO")
 def test_validate(DatabaseDAO, is_port_open, is_hostname_valid, app_context):
     """
     Test parameter validation.
@@ -797,8 +997,8 @@ def test_validate(DatabaseDAO, is_port_open, is_hostname_valid, app_context):
     command.run()
 
 
-@mock.patch("superset.db_engine_specs.base.is_hostname_valid")
-@mock.patch("superset.db_engine_specs.base.is_port_open")
+@patch("superset.db_engine_specs.base.is_hostname_valid")
+@patch("superset.db_engine_specs.base.is_port_open")
 def test_validate_partial(is_port_open, is_hostname_valid, app_context):
     """
     Test parameter validation when only some parameters are present.
@@ -838,7 +1038,7 @@ def test_validate_partial(is_port_open, is_hostname_valid, app_context):
     ]
 
 
-@mock.patch("superset.db_engine_specs.base.is_hostname_valid")
+@patch("superset.db_engine_specs.base.is_hostname_valid")
 def test_validate_partial_invalid_hostname(is_hostname_valid, app_context):
     """
     Test parameter validation when only some parameters are present.
@@ -892,7 +1092,7 @@ def test_validate_partial_invalid_hostname(is_hostname_valid, app_context):
 
 
 class TestTablesDatabaseCommand(SupersetTestCase):
-    @mock.patch("superset.databases.dao.DatabaseDAO.find_by_id")
+    @patch("superset.databases.dao.DatabaseDAO.find_by_id")
     def test_database_tables_list_with_unknown_database(self, mock_find_by_id):
         mock_find_by_id.return_value = None
         command = TablesDatabaseCommand(1, "test", False)
@@ -901,9 +1101,9 @@ class TestTablesDatabaseCommand(SupersetTestCase):
             command.run()
             assert str(excinfo.value) == ("Database not found.")
 
-    @mock.patch("superset.databases.dao.DatabaseDAO.find_by_id")
-    @mock.patch("superset.security.manager.SupersetSecurityManager.can_access_database")
-    @mock.patch("superset.utils.core.g")
+    @patch("superset.databases.dao.DatabaseDAO.find_by_id")
+    @patch("superset.security.manager.SupersetSecurityManager.can_access_database")
+    @patch("superset.utils.core.g")
     def test_database_tables_superset_exception(
         self, mock_g, mock_can_access_database, mock_find_by_id
     ):
@@ -920,9 +1120,9 @@ class TestTablesDatabaseCommand(SupersetTestCase):
             command.run()
             assert str(excinfo.value) == "Test Error"
 
-    @mock.patch("superset.databases.dao.DatabaseDAO.find_by_id")
-    @mock.patch("superset.security.manager.SupersetSecurityManager.can_access_database")
-    @mock.patch("superset.utils.core.g")
+    @patch("superset.databases.dao.DatabaseDAO.find_by_id")
+    @patch("superset.security.manager.SupersetSecurityManager.can_access_database")
+    @patch("superset.utils.core.g")
     def test_database_tables_exception(
         self, mock_g, mock_can_access_database, mock_find_by_id
     ):
@@ -939,9 +1139,9 @@ class TestTablesDatabaseCommand(SupersetTestCase):
                 == "Unexpected error occurred, please check your logs for details"
             )
 
-    @mock.patch("superset.databases.dao.DatabaseDAO.find_by_id")
-    @mock.patch("superset.security.manager.SupersetSecurityManager.can_access_database")
-    @mock.patch("superset.utils.core.g")
+    @patch("superset.databases.dao.DatabaseDAO.find_by_id")
+    @patch("superset.security.manager.SupersetSecurityManager.can_access_database")
+    @patch("superset.utils.core.g")
     def test_database_tables_list_tables(
         self, mock_g, mock_can_access_database, mock_find_by_id
     ):
diff --git a/tests/integration_tests/datasets/commands_tests.py b/tests/integration_tests/datasets/commands_tests.py
index 0ce98477a0..6f5b796c6a 100644
--- a/tests/integration_tests/datasets/commands_tests.py
+++ b/tests/integration_tests/datasets/commands_tests.py
@@ -327,10 +327,11 @@ class TestImportDatasetsCommand(SupersetTestCase):
         db.session.commit()
 
     @patch("superset.datasets.commands.importers.v1.utils.g")
+    @patch("superset.security.manager.g")
     @pytest.mark.usefixtures("load_energy_table_with_slice")
-    def test_import_v1_dataset(self, mock_g):
+    def test_import_v1_dataset(self, sm_g, utils_g):
         """Test that we can import a dataset"""
-        mock_g.user = security_manager.find_user("admin")
+        admin = sm_g.user = utils_g.user = security_manager.find_user("admin")
         contents = {
             "metadata.yaml": yaml.safe_dump(dataset_metadata_config),
             "databases/imported_database.yaml": yaml.safe_dump(database_config),
@@ -360,7 +361,7 @@ class TestImportDatasetsCommand(SupersetTestCase):
         )
 
         # user should be included as one of the owners
-        assert dataset.owners == [mock_g.user]
+        assert dataset.owners == [admin]
 
         # database is also imported
         assert str(dataset.database.uuid) == "b8a1ccd3-779d-4ab7-8ad8-9ab119d7fe89"
@@ -395,8 +396,11 @@ class TestImportDatasetsCommand(SupersetTestCase):
         db.session.delete(dataset.database)
         db.session.commit()
 
-    def test_import_v1_dataset_multiple(self):
+    @patch("superset.security.manager.g")
+    def test_import_v1_dataset_multiple(self, mock_g):
         """Test that a dataset can be imported multiple times"""
+        mock_g.user = security_manager.find_user("admin")
+
         contents = {
             "metadata.yaml": yaml.safe_dump(dataset_metadata_config),
             "databases/imported_database.yaml": yaml.safe_dump(database_config),
@@ -483,8 +487,11 @@ class TestImportDatasetsCommand(SupersetTestCase):
             }
         }
 
-    def test_import_v1_dataset_existing_database(self):
+    @patch("superset.security.manager.g")
+    def test_import_v1_dataset_existing_database(self, mock_g):
         """Test that a dataset can be imported when the database already exists"""
+        mock_g.user = security_manager.find_user("admin")
+
         # first import database...
         contents = {
             "metadata.yaml": yaml.safe_dump(database_metadata_config),
diff --git a/tests/integration_tests/fixtures/importexport.py b/tests/integration_tests/fixtures/importexport.py
index b624f3e63c..0f695f044e 100644
--- a/tests/integration_tests/fixtures/importexport.py
+++ b/tests/integration_tests/fixtures/importexport.py
@@ -360,6 +360,111 @@ database_config: Dict[str, Any] = {
     "uuid": "b8a1ccd3-779d-4ab7-8ad8-9ab119d7fe89",
     "version": "1.0.0",
 }
+database_with_ssh_tunnel_config_private_key: Dict[str, Any] = {
+    "allow_csv_upload": True,
+    "allow_ctas": True,
+    "allow_cvas": True,
+    "allow_dml": True,
+    "allow_run_async": False,
+    "cache_timeout": None,
+    "database_name": "imported_database",
+    "expose_in_sqllab": True,
+    "extra": {},
+    "sqlalchemy_uri": "sqlite:///test.db",
+    "uuid": "b8a1ccd3-779d-4ab7-8ad8-9ab119d7fe89",
+    "ssh_tunnel": {
+        "server_address": "localhost",
+        "server_port": 22,
+        "username": "Test",
+        "private_key": "XXXXXXXXXX",
+        "private_key_password": "XXXXXXXXXX",
+    },
+    "version": "1.0.0",
+}
+
+database_with_ssh_tunnel_config_password: Dict[str, Any] = {
+    "allow_csv_upload": True,
+    "allow_ctas": True,
+    "allow_cvas": True,
+    "allow_dml": True,
+    "allow_run_async": False,
+    "cache_timeout": None,
+    "database_name": "imported_database",
+    "expose_in_sqllab": True,
+    "extra": {},
+    "sqlalchemy_uri": "sqlite:///test.db",
+    "uuid": "b8a1ccd3-779d-4ab7-8ad8-9ab119d7fe89",
+    "ssh_tunnel": {
+        "server_address": "localhost",
+        "server_port": 22,
+        "username": "Test",
+        "password": "XXXXXXXXXX",
+    },
+    "version": "1.0.0",
+}
+
+database_with_ssh_tunnel_config_no_credentials: Dict[str, Any] = {
+    "allow_csv_upload": True,
+    "allow_ctas": True,
+    "allow_cvas": True,
+    "allow_dml": True,
+    "allow_run_async": False,
+    "cache_timeout": None,
+    "database_name": "imported_database",
+    "expose_in_sqllab": True,
+    "extra": {},
+    "sqlalchemy_uri": "sqlite:///test.db",
+    "uuid": "b8a1ccd3-779d-4ab7-8ad8-9ab119d7fe89",
+    "ssh_tunnel": {
+        "server_address": "localhost",
+        "server_port": 22,
+        "username": "Test",
+    },
+    "version": "1.0.0",
+}
+
+database_with_ssh_tunnel_config_mix_credentials: Dict[str, Any] = {
+    "allow_csv_upload": True,
+    "allow_ctas": True,
+    "allow_cvas": True,
+    "allow_dml": True,
+    "allow_run_async": False,
+    "cache_timeout": None,
+    "database_name": "imported_database",
+    "expose_in_sqllab": True,
+    "extra": {},
+    "sqlalchemy_uri": "sqlite:///test.db",
+    "uuid": "b8a1ccd3-779d-4ab7-8ad8-9ab119d7fe89",
+    "ssh_tunnel": {
+        "server_address": "localhost",
+        "server_port": 22,
+        "username": "Test",
+        "password": "XXXXXXXXXX",
+        "private_key": "XXXXXXXXXX",
+    },
+    "version": "1.0.0",
+}
+
+database_with_ssh_tunnel_config_private_pass_only: Dict[str, Any] = {
+    "allow_csv_upload": True,
+    "allow_ctas": True,
+    "allow_cvas": True,
+    "allow_dml": True,
+    "allow_run_async": False,
+    "cache_timeout": None,
+    "database_name": "imported_database",
+    "expose_in_sqllab": True,
+    "extra": {},
+    "sqlalchemy_uri": "sqlite:///test.db",
+    "uuid": "b8a1ccd3-779d-4ab7-8ad8-9ab119d7fe89",
+    "ssh_tunnel": {
+        "server_address": "localhost",
+        "server_port": 22,
+        "username": "Test",
+        "private_key_password": "XXXXXXXXXX",
+    },
+    "version": "1.0.0",
+}
 
 dataset_config: Dict[str, Any] = {
     "table_name": "imported_dataset",
diff --git a/tests/integration_tests/queries/saved_queries/commands_tests.py b/tests/integration_tests/queries/saved_queries/commands_tests.py
index bd90419155..5c7b862209 100644
--- a/tests/integration_tests/queries/saved_queries/commands_tests.py
+++ b/tests/integration_tests/queries/saved_queries/commands_tests.py
@@ -142,8 +142,11 @@ class TestExportSavedQueriesCommand(SupersetTestCase):
 
 
 class TestImportSavedQueriesCommand(SupersetTestCase):
-    def test_import_v1_saved_queries(self):
+    @patch("superset.security.manager.g")
+    def test_import_v1_saved_queries(self, mock_g):
         """Test that we can import a saved query"""
+        mock_g.user = security_manager.find_user("admin")
+
         contents = {
             "metadata.yaml": yaml.safe_dump(saved_queries_metadata_config),
             "databases/imported_database.yaml": yaml.safe_dump(database_config),
@@ -169,8 +172,11 @@ class TestImportSavedQueriesCommand(SupersetTestCase):
         db.session.delete(database)
         db.session.commit()
 
-    def test_import_v1_saved_queries_multiple(self):
+    @patch("superset.security.manager.g")
+    def test_import_v1_saved_queries_multiple(self, mock_g):
         """Test that a saved query can be imported multiple times"""
+        mock_g.user = security_manager.find_user("admin")
+
         contents = {
             "metadata.yaml": yaml.safe_dump(saved_queries_metadata_config),
             "databases/imported_database.yaml": yaml.safe_dump(database_config),
diff --git a/tests/unit_tests/charts/commands/importers/v1/import_test.py b/tests/unit_tests/charts/commands/importers/v1/import_test.py
index e29fd70fb8..06e0063fe9 100644
--- a/tests/unit_tests/charts/commands/importers/v1/import_test.py
+++ b/tests/unit_tests/charts/commands/importers/v1/import_test.py
@@ -18,19 +18,26 @@
 
 import copy
 
+import pytest
+from pytest_mock import MockFixture
 from sqlalchemy.orm.session import Session
 
+from superset.commands.exceptions import ImportFailedError
 
-def test_import_chart(session: Session) -> None:
+
+def test_import_chart(mocker: MockFixture, session: Session) -> None:
     """
     Test importing a chart.
     """
+    from superset import security_manager
     from superset.charts.commands.importers.v1.utils import import_chart
     from superset.connectors.sqla.models import SqlaTable
     from superset.models.core import Database
     from superset.models.slice import Slice
     from tests.integration_tests.fixtures.importexport import chart_config
 
+    mocker.patch.object(security_manager, "can_access", return_value=True)
+
     engine = session.get_bind()
     Slice.metadata.create_all(engine)  # pylint: disable=no-member
 
@@ -45,16 +52,19 @@ def test_import_chart(session: Session) -> None:
     assert chart.external_url is None
 
 
-def test_import_chart_managed_externally(session: Session) -> None:
+def test_import_chart_managed_externally(mocker: MockFixture, session: Session) -> None:
     """
     Test importing a chart that is managed externally.
     """
+    from superset import security_manager
     from superset.charts.commands.importers.v1.utils import import_chart
     from superset.connectors.sqla.models import SqlaTable
     from superset.models.core import Database
     from superset.models.slice import Slice
     from tests.integration_tests.fixtures.importexport import chart_config
 
+    mocker.patch.object(security_manager, "can_access", return_value=True)
+
     engine = session.get_bind()
     Slice.metadata.create_all(engine)  # pylint: disable=no-member
 
@@ -67,3 +77,34 @@ def test_import_chart_managed_externally(session: Session) -> None:
     chart = import_chart(session, config)
     assert chart.is_managed_externally is True
     assert chart.external_url == "https://example.org/my_chart"
+
+
+def test_import_chart_without_permission(
+    mocker: MockFixture,
+    session: Session,
+) -> None:
+    """
+    Test importing a chart when a user doesn't have permissions to create.
+    """
+    from superset import security_manager
+    from superset.charts.commands.importers.v1.utils import import_chart
+    from superset.connectors.sqla.models import SqlaTable
+    from superset.models.core import Database
+    from superset.models.slice import Slice
+    from tests.integration_tests.fixtures.importexport import chart_config
+
+    mocker.patch.object(security_manager, "can_access", return_value=False)
+
+    engine = session.get_bind()
+    Slice.metadata.create_all(engine)  # pylint: disable=no-member
+
+    config = copy.deepcopy(chart_config)
+    config["datasource_id"] = 1
+    config["datasource_type"] = "table"
+
+    with pytest.raises(ImportFailedError) as excinfo:
+        import_chart(session, config)
+    assert (
+        str(excinfo.value)
+        == "Chart doesn't exist and user doesn't have permission to create charts"
+    )
diff --git a/tests/unit_tests/commands/importers/v1/assets_test.py b/tests/unit_tests/commands/importers/v1/assets_test.py
index 1a345ff2b9..d48eed1be7 100644
--- a/tests/unit_tests/commands/importers/v1/assets_test.py
+++ b/tests/unit_tests/commands/importers/v1/assets_test.py
@@ -17,6 +17,7 @@
 
 import copy
 
+from pytest_mock import MockFixture
 from sqlalchemy.orm.session import Session
 from sqlalchemy.sql import select
 
@@ -30,14 +31,17 @@ from tests.unit_tests.fixtures.assets_configs import (
 )
 
 
-def test_import_new_assets(session: Session) -> None:
+def test_import_new_assets(mocker: MockFixture, session: Session) -> None:
     """
     Test that all new assets are imported correctly.
     """
+    from superset import security_manager
     from superset.commands.importers.v1.assets import ImportAssetsCommand
     from superset.models.dashboard import dashboard_slices
     from superset.models.slice import Slice
 
+    mocker.patch.object(security_manager, "can_access", return_value=True)
+
     engine = session.get_bind()
     Slice.metadata.create_all(engine)  # pylint: disable=no-member
     configs = {
@@ -59,14 +63,17 @@ def test_import_new_assets(session: Session) -> None:
     assert len(dashboard_ids) == expected_number_of_dashboards
 
 
-def test_import_adds_dashboard_charts(session: Session) -> None:
+def test_import_adds_dashboard_charts(mocker: MockFixture, session: Session) -> None:
     """
     Test that existing dashboards are updated with new charts.
     """
+    from superset import security_manager
     from superset.commands.importers.v1.assets import ImportAssetsCommand
     from superset.models.dashboard import dashboard_slices
     from superset.models.slice import Slice
 
+    mocker.patch.object(security_manager, "can_access", return_value=True)
+
     engine = session.get_bind()
     Slice.metadata.create_all(engine)  # pylint: disable=no-member
     base_configs = {
@@ -95,14 +102,17 @@ def test_import_adds_dashboard_charts(session: Session) -> None:
     assert len(dashboard_ids) == expected_number_of_dashboards
 
 
-def test_import_removes_dashboard_charts(session: Session) -> None:
+def test_import_removes_dashboard_charts(mocker: MockFixture, session: Session) -> None:
     """
     Test that existing dashboards are updated without old charts.
     """
+    from superset import security_manager
     from superset.commands.importers.v1.assets import ImportAssetsCommand
     from superset.models.dashboard import dashboard_slices
     from superset.models.slice import Slice
 
+    mocker.patch.object(security_manager, "can_access", return_value=True)
+
     engine = session.get_bind()
     Slice.metadata.create_all(engine)  # pylint: disable=no-member
     base_configs = {
diff --git a/tests/unit_tests/dashboards/commands/importers/v1/import_test.py b/tests/unit_tests/dashboards/commands/importers/v1/import_test.py
index 08f681d916..e07a23f6bf 100644
--- a/tests/unit_tests/dashboards/commands/importers/v1/import_test.py
+++ b/tests/unit_tests/dashboards/commands/importers/v1/import_test.py
@@ -18,19 +18,26 @@
 
 import copy
 
+import pytest
+from pytest_mock import MockFixture
 from sqlalchemy.orm.session import Session
 
+from superset.commands.exceptions import ImportFailedError
 
-def test_import_dashboard(session: Session) -> None:
+
+def test_import_dashboard(mocker: MockFixture, session: Session) -> None:
     """
     Test importing a dashboard.
     """
+    from superset import security_manager
     from superset.connectors.sqla.models import SqlaTable
     from superset.dashboards.commands.importers.v1.utils import import_dashboard
     from superset.models.core import Database
     from superset.models.slice import Slice
     from tests.integration_tests.fixtures.importexport import dashboard_config
 
+    mocker.patch.object(security_manager, "can_access", return_value=True)
+
     engine = session.get_bind()
     Slice.metadata.create_all(engine)  # pylint: disable=no-member
 
@@ -43,16 +50,22 @@ def test_import_dashboard(session: Session) -> None:
     assert dashboard.external_url is None
 
 
-def test_import_dashboard_managed_externally(session: Session) -> None:
+def test_import_dashboard_managed_externally(
+    mocker: MockFixture,
+    session: Session,
+) -> None:
     """
     Test importing a dashboard that is managed externally.
     """
+    from superset import security_manager
     from superset.connectors.sqla.models import SqlaTable
     from superset.dashboards.commands.importers.v1.utils import import_dashboard
     from superset.models.core import Database
     from superset.models.slice import Slice
     from tests.integration_tests.fixtures.importexport import dashboard_config
 
+    mocker.patch.object(security_manager, "can_access", return_value=True)
+
     engine = session.get_bind()
     Slice.metadata.create_all(engine)  # pylint: disable=no-member
 
@@ -63,3 +76,32 @@ def test_import_dashboard_managed_externally(session: Session) -> None:
     dashboard = import_dashboard(session, config)
     assert dashboard.is_managed_externally is True
     assert dashboard.external_url == "https://example.org/my_dashboard"
+
+
+def test_import_dashboard_without_permission(
+    mocker: MockFixture,
+    session: Session,
+) -> None:
+    """
+    Test importing a dashboard when a user doesn't have permissions to create.
+    """
+    from superset import security_manager
+    from superset.connectors.sqla.models import SqlaTable
+    from superset.dashboards.commands.importers.v1.utils import import_dashboard
+    from superset.models.core import Database
+    from superset.models.slice import Slice
+    from tests.integration_tests.fixtures.importexport import dashboard_config
+
+    mocker.patch.object(security_manager, "can_access", return_value=False)
+
+    engine = session.get_bind()
+    Slice.metadata.create_all(engine)  # pylint: disable=no-member
+
+    config = copy.deepcopy(dashboard_config)
+
+    with pytest.raises(ImportFailedError) as excinfo:
+        import_dashboard(session, config)
+    assert (
+        str(excinfo.value)
+        == "Dashboard doesn't exist and user doesn't have permission to create dashboards"
+    )
diff --git a/tests/unit_tests/databases/commands/importers/v1/import_test.py b/tests/unit_tests/databases/commands/importers/v1/import_test.py
index e665bcb505..f9d2695f26 100644
--- a/tests/unit_tests/databases/commands/importers/v1/import_test.py
+++ b/tests/unit_tests/databases/commands/importers/v1/import_test.py
@@ -18,17 +18,24 @@
 
 import copy
 
+import pytest
+from pytest_mock import MockFixture
 from sqlalchemy.orm.session import Session
 
+from superset.commands.exceptions import ImportFailedError
 
-def test_import_database(session: Session) -> None:
+
+def test_import_database(mocker: MockFixture, session: Session) -> None:
     """
     Test importing a database.
     """
+    from superset import security_manager
     from superset.databases.commands.importers.v1.utils import import_database
     from superset.models.core import Database
     from tests.integration_tests.fixtures.importexport import database_config
 
+    mocker.patch.object(security_manager, "can_access", return_value=True)
+
     engine = session.get_bind()
     Database.metadata.create_all(engine)  # pylint: disable=no-member
 
@@ -58,14 +65,20 @@ def test_import_database(session: Session) -> None:
     assert database.allow_dml is False
 
 
-def test_import_database_managed_externally(session: Session) -> None:
+def test_import_database_managed_externally(
+    mocker: MockFixture,
+    session: Session,
+) -> None:
     """
     Test importing a database that is managed externally.
     """
+    from superset import security_manager
     from superset.databases.commands.importers.v1.utils import import_database
     from superset.models.core import Database
     from tests.integration_tests.fixtures.importexport import database_config
 
+    mocker.patch.object(security_manager, "can_access", return_value=True)
+
     engine = session.get_bind()
     Database.metadata.create_all(engine)  # pylint: disable=no-member
 
@@ -76,3 +89,30 @@ def test_import_database_managed_externally(session: Session) -> None:
     database = import_database(session, config)
     assert database.is_managed_externally is True
     assert database.external_url == "https://example.org/my_database"
+
+
+def test_import_database_without_permission(
+    mocker: MockFixture,
+    session: Session,
+) -> None:
+    """
+    Test importing a database when a user doesn't have permissions to create.
+    """
+    from superset import security_manager
+    from superset.databases.commands.importers.v1.utils import import_database
+    from superset.models.core import Database
+    from tests.integration_tests.fixtures.importexport import database_config
+
+    mocker.patch.object(security_manager, "can_access", return_value=False)
+
+    engine = session.get_bind()
+    Database.metadata.create_all(engine)  # pylint: disable=no-member
+
+    config = copy.deepcopy(database_config)
+
+    with pytest.raises(ImportFailedError) as excinfo:
+        import_database(session, config)
+    assert (
+        str(excinfo.value)
+        == "Database doesn't exist and user doesn't have permission to create databases"
+    )
diff --git a/tests/unit_tests/datasets/commands/importers/v1/import_test.py b/tests/unit_tests/datasets/commands/importers/v1/import_test.py
index 5b52ac7f1d..839374425b 100644
--- a/tests/unit_tests/datasets/commands/importers/v1/import_test.py
+++ b/tests/unit_tests/datasets/commands/importers/v1/import_test.py
@@ -25,20 +25,27 @@ from unittest.mock import Mock, patch
 
 import pytest
 from flask import current_app
+from pytest_mock import MockFixture
 from sqlalchemy.orm.session import Session
 
-from superset.datasets.commands.exceptions import DatasetForbiddenDataURI
+from superset.datasets.commands.exceptions import (
+    DatasetForbiddenDataURI,
+    ImportFailedError,
+)
 from superset.datasets.commands.importers.v1.utils import validate_data_uri
 
 
-def test_import_dataset(session: Session) -> None:
+def test_import_dataset(mocker: MockFixture, session: Session) -> None:
     """
     Test importing a dataset.
     """
+    from superset import security_manager
     from superset.connectors.sqla.models import SqlaTable
     from superset.datasets.commands.importers.v1.utils import import_dataset
     from superset.models.core import Database
 
+    mocker.patch.object(security_manager, "can_access", return_value=True)
+
     engine = session.get_bind()
     SqlaTable.metadata.create_all(engine)  # pylint: disable=no-member
 
@@ -143,15 +150,18 @@ def test_import_dataset(session: Session) -> None:
     assert sqla_table.database.id == database.id
 
 
-def test_import_dataset_duplicate_column(session: Session) -> None:
+def test_import_dataset_duplicate_column(mocker: MockFixture, session: Session) -> None:
     """
     Test importing a dataset with a column that already exists.
     """
+    from superset import security_manager
     from superset.columns.models import Column as NewColumn
     from superset.connectors.sqla.models import SqlaTable, TableColumn
     from superset.datasets.commands.importers.v1.utils import import_dataset
     from superset.models.core import Database
 
+    mocker.patch.object(security_manager, "can_access", return_value=True)
+
     engine = session.get_bind()
     SqlaTable.metadata.create_all(engine)  # pylint: disable=no-member
 
@@ -266,15 +276,18 @@ def test_import_dataset_duplicate_column(session: Session) -> None:
     assert sqla_table.database.id == database.id
 
 
-def test_import_column_extra_is_string(session: Session) -> None:
+def test_import_column_extra_is_string(mocker: MockFixture, session: Session) -> None:
     """
     Test importing a dataset when the column extra is a string.
     """
+    from superset import security_manager
     from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn
     from superset.datasets.commands.importers.v1.utils import import_dataset
     from superset.datasets.schemas import ImportV1DatasetSchema
     from superset.models.core import Database
 
+    mocker.patch.object(security_manager, "can_access", return_value=True)
+
     engine = session.get_bind()
     SqlaTable.metadata.create_all(engine)  # pylint: disable=no-member
 
@@ -347,12 +360,17 @@ def test_import_column_extra_is_string(session: Session) -> None:
 
 
 @patch("superset.datasets.commands.importers.v1.utils.request")
-def test_import_column_allowed_data_url(request: Mock, session: Session) -> None:
+def test_import_column_allowed_data_url(
+    request: Mock,
+    mocker: MockFixture,
+    session: Session,
+) -> None:
     """
     Test importing a dataset when using data key to fetch data from a URL.
     """
     import io
 
+    from superset import security_manager
     from superset.connectors.sqla.models import SqlaTable
     from superset.datasets.commands.importers.v1.utils import import_dataset
     from superset.datasets.schemas import ImportV1DatasetSchema
@@ -360,6 +378,8 @@ def test_import_column_allowed_data_url(request: Mock, session: Session) -> None
 
     request.urlopen.return_value = io.StringIO("col1\nvalue1\nvalue2\n")
 
+    mocker.patch.object(security_manager, "can_access", return_value=True)
+
     engine = session.get_bind()
     SqlaTable.metadata.create_all(engine)  # pylint: disable=no-member
 
@@ -419,15 +439,21 @@ def test_import_column_allowed_data_url(request: Mock, session: Session) -> None
     ).fetchall()
 
 
-def test_import_dataset_managed_externally(session: Session) -> None:
+def test_import_dataset_managed_externally(
+    mocker: MockFixture,
+    session: Session,
+) -> None:
     """
     Test importing a dataset that is managed externally.
     """
+    from superset import security_manager
     from superset.connectors.sqla.models import SqlaTable
     from superset.datasets.commands.importers.v1.utils import import_dataset
     from superset.models.core import Database
     from tests.integration_tests.fixtures.importexport import dataset_config
 
+    mocker.patch.object(security_manager, "can_access", return_value=True)
+
     engine = session.get_bind()
     SqlaTable.metadata.create_all(engine)  # pylint: disable=no-member