You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by be...@apache.org on 2023/08/08 23:02:27 UTC
[superset] 01/01: feat(gsheets): file upload
This is an automated email from the ASF dual-hosted git repository.
beto pushed a commit to branch upload_gsheets
in repository https://gitbox.apache.org/repos/asf/superset.git
commit 06ffb116b3ef255b691475e2ee8e8b486f2b224c
Author: Beto Dealmeida <ro...@dealmeida.net>
AuthorDate: Tue Aug 8 15:58:56 2023 -0700
feat(gsheets): file upload
---
superset/db_engine_specs/gsheets.py | 151 ++++++++++++++++++++++++++++++++----
1 file changed, 136 insertions(+), 15 deletions(-)
diff --git a/superset/db_engine_specs/gsheets.py b/superset/db_engine_specs/gsheets.py
index 777499a8f9..d9c818c614 100644
--- a/superset/db_engine_specs/gsheets.py
+++ b/superset/db_engine_specs/gsheets.py
@@ -14,30 +14,42 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+from __future__ import annotations
+
import json
+import logging
import re
from re import Pattern
-from typing import Any, Optional, TYPE_CHECKING
+from typing import Any, TYPE_CHECKING
+import pandas as pd
from apispec import APISpec
from apispec.ext.marshmallow import MarshmallowPlugin
from flask import g
from flask_babel import gettext as __
from marshmallow import fields, Schema
from marshmallow.exceptions import ValidationError
+from requests import Session
+from shillelagh.backends.apsw.dialects.base import get_adapter_for_table_name
from sqlalchemy.engine import create_engine
from sqlalchemy.engine.url import URL
from typing_extensions import TypedDict
-from superset import security_manager
+from superset import db, security_manager
from superset.constants import PASSWORD_MASK
from superset.databases.schemas import encrypted_field_properties, EncryptedString
from superset.db_engine_specs.shillelagh import ShillelaghEngineSpec
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
+from superset.exceptions import SupersetException
if TYPE_CHECKING:
from superset.models.core import Database
+ from superset.sql_parse import Table
+_logger = logging.getLogger()
+
+EXAMPLE_GSHEETS_URL = "https://docs.google.com/spreadsheets/d/1LcWZMsdCl92g7nA-D6qGRqg1T5TiHyuKJUY1u9XAnsk/edit#gid=0"
SYNTAX_ERROR_REGEX = re.compile('SQLError: near "(?P<server_error>.*?)": syntax error')
@@ -57,7 +69,7 @@ class GSheetsParametersSchema(Schema):
class GSheetsParametersType(TypedDict):
service_account_info: str
- catalog: Optional[dict[str, str]]
+ catalog: dict[str, str] | None
class GSheetsPropertiesType(TypedDict):
@@ -88,14 +100,14 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
),
}
- supports_file_upload = False
+ supports_file_upload = True
@classmethod
def get_url_for_impersonation(
cls,
url: URL,
impersonate_user: bool,
- username: Optional[str],
+ username: str | None,
) -> URL:
if impersonate_user and username is not None:
user = security_manager.find_user(username=username)
@@ -107,9 +119,9 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
@classmethod
def extra_table_metadata(
cls,
- database: "Database",
+ database: Database,
table_name: str,
- schema_name: Optional[str],
+ schema_name: str | None,
) -> dict[str, Any]:
with database.get_raw_connection(schema=schema_name) as conn:
cursor = conn.cursor()
@@ -126,9 +138,8 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
def build_sqlalchemy_uri(
cls,
_: GSheetsParametersType,
- encrypted_extra: Optional[ # pylint: disable=unused-argument
- dict[str, Any]
- ] = None,
+ encrypted_extra: None
+ | (dict[str, Any]) = None, # pylint: disable=unused-argument
) -> str:
return "gsheets://"
@@ -136,7 +147,7 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
def get_parameters_from_uri(
cls,
uri: str, # pylint: disable=unused-argument
- encrypted_extra: Optional[dict[str, Any]] = None,
+ encrypted_extra: dict[str, Any] | None = None,
) -> Any:
# Building parameters from encrypted_extra and uri
if encrypted_extra:
@@ -145,7 +156,7 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
raise ValidationError("Invalid service credentials")
@classmethod
- def mask_encrypted_extra(cls, encrypted_extra: Optional[str]) -> Optional[str]:
+ def mask_encrypted_extra(cls, encrypted_extra: str | None) -> str | None:
if encrypted_extra is None:
return encrypted_extra
@@ -162,9 +173,7 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
return json.dumps(config)
@classmethod
- def unmask_encrypted_extra(
- cls, old: Optional[str], new: Optional[str]
- ) -> Optional[str]:
+ def unmask_encrypted_extra(cls, old: str | None, new: str | None) -> str | None:
"""
Reuse ``private_key`` if available and unchanged.
"""
@@ -299,3 +308,115 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
)
idx += 1
return errors
+
+ @staticmethod
+ def _do_post(
+ session: Session,
+ url: str,
+ body: dict[str, Any],
+ **kwargs: Any,
+ ) -> dict[str, Any]:
+ """
+ POST to the Google API.
+
+ Helper function that handles logging and error handling.
+ """
+ _logger.info("POST %s", url)
+ _logger.debug(body)
+ response = session.post(
+ url,
+ json=body,
+ **kwargs,
+ )
+
+ payload = response.json()
+ _logger.debug(payload)
+
+ if "error" in payload:
+ raise SupersetException(payload["error"]["message"])
+
+ return payload
+
+ @classmethod
+ def df_to_sql(
+ cls,
+ database: Database,
+ table: Table,
+ df: pd.DataFrame,
+ to_sql_kwargs: dict[str, Any],
+ ) -> None:
+ """
+ Create a new sheet and update the DB catalog.
+
+ Since Google Sheets is not a database, uploading a file is slightly different
+ from other traditional databases. To create a table with a given name we first
+ create a spreadsheet with the contents of the dataframe, and we later update the
+ database catalog to add a mapping between the desired table name and the URL of
+ the new sheet.
+ """
+ # grab the existing catalog, if any
+ extra = database.get_extra()
+ engine_params = extra.setdefault("engine_params", {})
+ catalog = engine_params.setdefault("catalog", {})
+
+ # sanity checks
+ spreadsheet_url = catalog.get(table.table)
+ if spreadsheet_url and "if_exists" in to_sql_kwargs:
+ if to_sql_kwargs["if_exists"] == "append":
+ # no way we're going to append a dataframe to a spreadsheet, that's
+ # never going to work
+ raise SupersetException("Append operation not currently supported")
+ if to_sql_kwargs["if_exists"] == "fail":
+ raise SupersetException("Table already exists")
+ if to_sql_kwargs["if_exists"] == "replace":
+ pass
+
+ # get the Google session from the Shillelagh adapter
+ with cls.get_engine(database) as engine:
+ with engine.connect() as conn:
+ # any GSheets URL will work to get a working session
+ adapter = get_adapter_for_table_name(
+ conn,
+ spreadsheet_url or EXAMPLE_GSHEETS_URL,
+ )
+ session = adapter._get_session() # pylint: disable=protected-access
+
+ # create or replace sheet
+ if spreadsheet_url:
+ url = (
+ "https://sheets.googleapis.com/v4/spreadsheets/"
+ f"{adapter._spreadsheet_id}/values/'{adapter._sheet_name}':clear"
+ )
+ cls._do_post(session, url, {})
+ else:
+ payload = cls._do_post(
+ session,
+ "https://sheets.googleapis.com/v4/spreadsheets",
+ {"properties": {"title": table.table}},
+ )
+ spreadsheet_id = payload["spreadsheetId"]
+ spreadsheet_url = payload["spreadsheetUrl"]
+ range_ = payload["sheets"][0]["properties"]["title"]
+
+ # insert data
+ body = {
+ "range": range_,
+ "majorDimension": "ROWS",
+ "values": df.values.tolist(),
+ }
+ url = (
+ "https://sheets.googleapis.com/v4/spreadsheets/"
+ f"{spreadsheet_id}/values/{range_}:append"
+ )
+ cls._do_post(
+ session,
+ url,
+ body,
+ params={"valueInputOption": "USER_ENTERED"},
+ )
+
+ # update catalog
+ catalog[table.table] = spreadsheet_url
+ database.extra = json.dumps(extra)
+ db.session.add(database)
+ db.session.commit()