You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by be...@apache.org on 2023/08/11 13:35:10 UTC
[superset] branch master updated: feat(gsheets): file upload (#24921)
This is an automated email from the ASF dual-hosted git repository.
beto pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/master by this push:
new f5ed4072e4 feat(gsheets): file upload (#24921)
f5ed4072e4 is described below
commit f5ed4072e4470cd2c9867f9b5ea96e0a34cf06e7
Author: Beto Dealmeida <ro...@dealmeida.net>
AuthorDate: Fri Aug 11 06:35:00 2023 -0700
feat(gsheets): file upload (#24921)
---
requirements/development.txt | 2 +-
requirements/testing.in | 2 +-
requirements/testing.txt | 7 +-
superset/db_engine_specs/gsheets.py | 162 ++++++++++++++++++++---
tests/integration_tests/databases/api_tests.py | 2 +-
tests/unit_tests/databases/api_test.py | 4 +-
tests/unit_tests/db_engine_specs/test_gsheets.py | 92 +++++++++++++
7 files changed, 250 insertions(+), 21 deletions(-)
diff --git a/requirements/development.txt b/requirements/development.txt
index bba99045cc..7155b2e1a5 100644
--- a/requirements/development.txt
+++ b/requirements/development.txt
@@ -106,7 +106,7 @@ pylint==2.17.4
# via -r requirements/development.in
python-ldap==3.4.3
# via -r requirements/development.in
-requests==2.30.0
+requests==2.31.0
# via
# pydruid
# tableschema
diff --git a/requirements/testing.in b/requirements/testing.in
index 856c5272dc..b991be1040 100644
--- a/requirements/testing.in
+++ b/requirements/testing.in
@@ -16,7 +16,7 @@
#
-r development.in
-r integration.in
--e file:.[bigquery,hive,presto,prophet,trino]
+-e file:.[bigquery,hive,presto,prophet,trino,gsheets]
docker
flask-testing
freezegun
diff --git a/requirements/testing.txt b/requirements/testing.txt
index 283b4c9fcd..c8a3221b45 100644
--- a/requirements/testing.txt
+++ b/requirements/testing.txt
@@ -1,4 +1,4 @@
-# SHA1:78fe89f88adf34ac75513d363d7d9d0b5cc8cd1c
+# SHA1:78d0270a4f583095e0587aa21f57fc2ff7fe8b84
#
# This file is autogenerated by pip-compile-multi
# To update, run:
@@ -12,6 +12,8 @@
# -r requirements/base.in
# -r requirements/development.in
# -r requirements/testing.in
+apsw==3.42.0.1
+ # via shillelagh
cmdstanpy==1.1.0
# via prophet
contourpy==1.0.7
@@ -50,6 +52,7 @@ google-auth==2.17.3
# google-cloud-core
# pandas-gbq
# pydata-google-auth
+ # shillelagh
# sqlalchemy-bigquery
google-auth-oauthlib==1.0.0
# via
@@ -142,6 +145,8 @@ rfc3339-validator==0.1.4
# via openapi-schema-validator
rsa==4.9
# via google-auth
+shillelagh[gsheetsapi]==1.2.6
+ # via apache-superset
sqlalchemy-bigquery==1.6.1
# via apache-superset
statsd==4.0.1
diff --git a/superset/db_engine_specs/gsheets.py b/superset/db_engine_specs/gsheets.py
index 777499a8f9..a9ec921188 100644
--- a/superset/db_engine_specs/gsheets.py
+++ b/superset/db_engine_specs/gsheets.py
@@ -14,30 +14,44 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+from __future__ import annotations
+
import json
+import logging
import re
from re import Pattern
-from typing import Any, Optional, TYPE_CHECKING
+from typing import Any, TYPE_CHECKING
+import pandas as pd
from apispec import APISpec
from apispec.ext.marshmallow import MarshmallowPlugin
from flask import g
from flask_babel import gettext as __
from marshmallow import fields, Schema
from marshmallow.exceptions import ValidationError
+from requests import Session
from sqlalchemy.engine import create_engine
from sqlalchemy.engine.url import URL
from typing_extensions import TypedDict
-from superset import security_manager
+from superset import db, security_manager
from superset.constants import PASSWORD_MASK
from superset.databases.schemas import encrypted_field_properties, EncryptedString
from superset.db_engine_specs.shillelagh import ShillelaghEngineSpec
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
+from superset.exceptions import SupersetException
if TYPE_CHECKING:
from superset.models.core import Database
+ from superset.sql_parse import Table
+_logger = logging.getLogger()
+
+EXAMPLE_GSHEETS_URL = (
+ "https://docs.google.com/spreadsheets/d/"
+ "1LcWZMsdCl92g7nA-D6qGRqg1T5TiHyuKJUY1u9XAnsk/edit#gid=0"
+)
SYNTAX_ERROR_REGEX = re.compile('SQLError: near "(?P<server_error>.*?)": syntax error')
@@ -57,7 +71,7 @@ class GSheetsParametersSchema(Schema):
class GSheetsParametersType(TypedDict):
service_account_info: str
- catalog: Optional[dict[str, str]]
+ catalog: dict[str, str] | None
class GSheetsPropertiesType(TypedDict):
@@ -88,14 +102,14 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
),
}
- supports_file_upload = False
+ supports_file_upload = True
@classmethod
def get_url_for_impersonation(
cls,
url: URL,
impersonate_user: bool,
- username: Optional[str],
+ username: str | None,
) -> URL:
if impersonate_user and username is not None:
user = security_manager.find_user(username=username)
@@ -107,9 +121,9 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
@classmethod
def extra_table_metadata(
cls,
- database: "Database",
+ database: Database,
table_name: str,
- schema_name: Optional[str],
+ schema_name: str | None,
) -> dict[str, Any]:
with database.get_raw_connection(schema=schema_name) as conn:
cursor = conn.cursor()
@@ -126,9 +140,8 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
def build_sqlalchemy_uri(
cls,
_: GSheetsParametersType,
- encrypted_extra: Optional[ # pylint: disable=unused-argument
- dict[str, Any]
- ] = None,
+ encrypted_extra: None # pylint: disable=unused-argument
+ | (dict[str, Any]) = None,
) -> str:
return "gsheets://"
@@ -136,7 +149,7 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
def get_parameters_from_uri(
cls,
uri: str, # pylint: disable=unused-argument
- encrypted_extra: Optional[dict[str, Any]] = None,
+ encrypted_extra: dict[str, Any] | None = None,
) -> Any:
# Building parameters from encrypted_extra and uri
if encrypted_extra:
@@ -145,7 +158,7 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
raise ValidationError("Invalid service credentials")
@classmethod
- def mask_encrypted_extra(cls, encrypted_extra: Optional[str]) -> Optional[str]:
+ def mask_encrypted_extra(cls, encrypted_extra: str | None) -> str | None:
if encrypted_extra is None:
return encrypted_extra
@@ -162,9 +175,7 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
return json.dumps(config)
@classmethod
- def unmask_encrypted_extra(
- cls, old: Optional[str], new: Optional[str]
- ) -> Optional[str]:
+ def unmask_encrypted_extra(cls, old: str | None, new: str | None) -> str | None:
"""
Reuse ``private_key`` if available and unchanged.
"""
@@ -299,3 +310,124 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
)
idx += 1
return errors
+
+ @staticmethod
+ def _do_post(
+ session: Session,
+ url: str,
+ body: dict[str, Any],
+ **kwargs: Any,
+ ) -> dict[str, Any]:
+ """
+ POST to the Google API.
+
+ Helper function that handles logging and error handling.
+ """
+ _logger.info("POST %s", url)
+ _logger.debug(body)
+ response = session.post(
+ url,
+ json=body,
+ **kwargs,
+ )
+
+ payload = response.json()
+ _logger.debug(payload)
+
+ if "error" in payload:
+ raise SupersetException(payload["error"]["message"])
+
+ return payload
+
+ @classmethod
+ def df_to_sql( # pylint: disable=too-many-locals
+ cls,
+ database: Database,
+ table: Table,
+ df: pd.DataFrame,
+ to_sql_kwargs: dict[str, Any],
+ ) -> None:
+ """
+ Create a new sheet and update the DB catalog.
+
+ Since Google Sheets is not a database, uploading a file is slightly different
+ from other traditional databases. To create a table with a given name we first
+ create a spreadsheet with the contents of the dataframe, and we later update the
+ database catalog to add a mapping between the desired table name and the URL of
+ the new sheet.
+
+ If the table already exists and the user wants it replaced we clear all the
+ cells in the existing sheet before uploading the new data. Appending to an
+ existing table is not supported because we can't ensure that the schemas match.
+ """
+ # pylint: disable=import-outside-toplevel
+ from shillelagh.backends.apsw.dialects.base import get_adapter_for_table_name
+
+ # grab the existing catalog, if any
+ extra = database.get_extra()
+ engine_params = extra.setdefault("engine_params", {})
+ catalog = engine_params.setdefault("catalog", {})
+
+ # sanity checks
+ spreadsheet_url = catalog.get(table.table)
+ if spreadsheet_url and "if_exists" in to_sql_kwargs:
+ if to_sql_kwargs["if_exists"] == "append":
+ # no way we're going to append a dataframe to a spreadsheet, that's
+ # never going to work
+ raise SupersetException("Append operation not currently supported")
+ if to_sql_kwargs["if_exists"] == "fail":
+ raise SupersetException("Table already exists")
+ if to_sql_kwargs["if_exists"] == "replace":
+ pass
+
+ # get the Google session from the Shillelagh adapter
+ with cls.get_engine(database) as engine:
+ with engine.connect() as conn:
+ # any GSheets URL will work to get a working session
+ adapter = get_adapter_for_table_name(
+ conn,
+ spreadsheet_url or EXAMPLE_GSHEETS_URL,
+ )
+ session = adapter._get_session() # pylint: disable=protected-access
+
+ # clear existing sheet, or create a new one
+ if spreadsheet_url:
+ spreadsheet_id = adapter._spreadsheet_id # pylint: disable=protected-access
+ range_ = adapter._sheet_name # pylint: disable=protected-access
+ url = (
+ "https://sheets.googleapis.com/v4/spreadsheets/"
+ f"{spreadsheet_id}/values/{range_}:clear"
+ )
+ cls._do_post(session, url, {})
+ else:
+ payload = cls._do_post(
+ session,
+ "https://sheets.googleapis.com/v4/spreadsheets",
+ {"properties": {"title": table.table}},
+ )
+ spreadsheet_id = payload["spreadsheetId"]
+ range_ = payload["sheets"][0]["properties"]["title"]
+ spreadsheet_url = payload["spreadsheetUrl"]
+
+ # insert data
+ body = {
+ "range": range_,
+ "majorDimension": "ROWS",
+ "values": df.fillna("").values.tolist(),
+ }
+ url = (
+ "https://sheets.googleapis.com/v4/spreadsheets/"
+ f"{spreadsheet_id}/values/{range_}:append"
+ )
+ cls._do_post(
+ session,
+ url,
+ body,
+ params={"valueInputOption": "USER_ENTERED"},
+ )
+
+ # update catalog
+ catalog[table.table] = spreadsheet_url
+ database.extra = json.dumps(extra)
+ db.session.add(database)
+ db.session.commit()
diff --git a/tests/integration_tests/databases/api_tests.py b/tests/integration_tests/databases/api_tests.py
index 4709a11377..cbdacc8f34 100644
--- a/tests/integration_tests/databases/api_tests.py
+++ b/tests/integration_tests/databases/api_tests.py
@@ -3153,7 +3153,7 @@ class TestDatabaseApi(SupersetTestCase):
"preferred": False,
"sqlalchemy_uri_placeholder": "gsheets://",
"engine_information": {
- "supports_file_upload": False,
+ "supports_file_upload": True,
"disable_ssh_tunneling": True,
},
},
diff --git a/tests/unit_tests/databases/api_test.py b/tests/unit_tests/databases/api_test.py
index 899e2b0234..aa15645ddb 100644
--- a/tests/unit_tests/databases/api_test.py
+++ b/tests/unit_tests/databases/api_test.py
@@ -178,7 +178,7 @@ def test_database_connection(
"driver": "gsheets",
"engine_information": {
"disable_ssh_tunneling": True,
- "supports_file_upload": False,
+ "supports_file_upload": True,
},
"expose_in_sqllab": True,
"extra": '{\n "metadata_params": {},\n "engine_params": {},\n "metadata_cache_timeout": {},\n "schemas_allowed_for_file_upload": []\n}\n',
@@ -249,7 +249,7 @@ def test_database_connection(
"driver": "gsheets",
"engine_information": {
"disable_ssh_tunneling": True,
- "supports_file_upload": False,
+ "supports_file_upload": True,
},
"expose_in_sqllab": True,
"force_ctas_schema": None,
diff --git a/tests/unit_tests/db_engine_specs/test_gsheets.py b/tests/unit_tests/db_engine_specs/test_gsheets.py
index 042e486642..7d7348c1a3 100644
--- a/tests/unit_tests/db_engine_specs/test_gsheets.py
+++ b/tests/unit_tests/db_engine_specs/test_gsheets.py
@@ -19,9 +19,13 @@
import json
+import pandas as pd
+import pytest
from pytest_mock import MockFixture
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
+from superset.exceptions import SupersetException
+from superset.sql_parse import Table
class ProgrammingError(Exception):
@@ -307,3 +311,91 @@ def test_unmask_encrypted_extra_when_new_is_none() -> None:
new = None
assert GSheetsEngineSpec.unmask_encrypted_extra(old, new) is None
+
+
+def test_upload_new(mocker: MockFixture) -> None:
+ """
+ Test file upload when the table does not exist.
+ """
+ from superset.db_engine_specs.gsheets import GSheetsEngineSpec
+
+ mocker.patch("superset.db_engine_specs.gsheets.db")
+ get_adapter_for_table_name = mocker.patch(
+ "shillelagh.backends.apsw.dialects.base.get_adapter_for_table_name"
+ )
+ session = get_adapter_for_table_name()._get_session()
+ session.post().json.return_value = {
+ "spreadsheetId": 1,
+ "spreadsheetUrl": "https://docs.example.org",
+ "sheets": [{"properties": {"title": "sample_data"}}],
+ }
+
+ database = mocker.MagicMock()
+ database.get_extra.return_value = {}
+
+ df = pd.DataFrame([1, "foo", 3.0])
+ table = Table("sample_data")
+
+ GSheetsEngineSpec.df_to_sql(database, table, df, {})
+ assert database.extra == json.dumps(
+ {"engine_params": {"catalog": {"sample_data": "https://docs.example.org"}}}
+ )
+
+
+def test_upload_existing(mocker: MockFixture) -> None:
+ """
+ Test file upload when the table does exist.
+ """
+ from superset.db_engine_specs.gsheets import GSheetsEngineSpec
+
+ mocker.patch("superset.db_engine_specs.gsheets.db")
+ get_adapter_for_table_name = mocker.patch(
+ "shillelagh.backends.apsw.dialects.base.get_adapter_for_table_name"
+ )
+ adapter = get_adapter_for_table_name()
+ adapter._spreadsheet_id = 1
+ adapter._sheet_name = "sheet0"
+ session = adapter._get_session()
+ session.post().json.return_value = {
+ "spreadsheetId": 1,
+ "spreadsheetUrl": "https://docs.example.org",
+ "sheets": [{"properties": {"title": "sample_data"}}],
+ }
+
+ database = mocker.MagicMock()
+ database.get_extra.return_value = {
+ "engine_params": {"catalog": {"sample_data": "https://docs.example.org"}}
+ }
+
+ df = pd.DataFrame([1, "foo", 3.0])
+ table = Table("sample_data")
+
+ with pytest.raises(SupersetException) as excinfo:
+ GSheetsEngineSpec.df_to_sql(database, table, df, {"if_exists": "append"})
+ assert str(excinfo.value) == "Append operation not currently supported"
+
+ with pytest.raises(SupersetException) as excinfo:
+ GSheetsEngineSpec.df_to_sql(database, table, df, {"if_exists": "fail"})
+ assert str(excinfo.value) == "Table already exists"
+
+ GSheetsEngineSpec.df_to_sql(database, table, df, {"if_exists": "replace"})
+ session.post.assert_has_calls(
+ [
+ mocker.call(),
+ mocker.call(
+ "https://sheets.googleapis.com/v4/spreadsheets/1/values/sheet0:clear",
+ json={},
+ ),
+ mocker.call().json(),
+ mocker.call(
+ "https://sheets.googleapis.com/v4/spreadsheets/1/values/sheet0:append",
+ json={
+ "range": "sheet0",
+ "majorDimension": "ROWS",
+ "values": [[1], ["foo"], [3.0]],
+ },
+ params={"valueInputOption": "USER_ENTERED"},
+ ),
+ mocker.call().json(),
+ ]
+ )