You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by be...@apache.org on 2023/08/08 23:02:27 UTC

[superset] 01/01: feat(gsheets): file upload

This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch upload_gsheets
in repository https://gitbox.apache.org/repos/asf/superset.git

commit 06ffb116b3ef255b691475e2ee8e8b486f2b224c
Author: Beto Dealmeida <ro...@dealmeida.net>
AuthorDate: Tue Aug 8 15:58:56 2023 -0700

    feat(gsheets): file upload
---
 superset/db_engine_specs/gsheets.py | 151 ++++++++++++++++++++++++++++++++----
 1 file changed, 136 insertions(+), 15 deletions(-)

diff --git a/superset/db_engine_specs/gsheets.py b/superset/db_engine_specs/gsheets.py
index 777499a8f9..d9c818c614 100644
--- a/superset/db_engine_specs/gsheets.py
+++ b/superset/db_engine_specs/gsheets.py
@@ -14,30 +14,42 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+from __future__ import annotations
+
 import json
+import logging
 import re
 from re import Pattern
-from typing import Any, Optional, TYPE_CHECKING
+from typing import Any, TYPE_CHECKING
 
+import pandas as pd
 from apispec import APISpec
 from apispec.ext.marshmallow import MarshmallowPlugin
 from flask import g
 from flask_babel import gettext as __
 from marshmallow import fields, Schema
 from marshmallow.exceptions import ValidationError
+from requests import Session
+from shillelagh.backends.apsw.dialects.base import get_adapter_for_table_name
 from sqlalchemy.engine import create_engine
 from sqlalchemy.engine.url import URL
 from typing_extensions import TypedDict
 
-from superset import security_manager
+from superset import db, security_manager
 from superset.constants import PASSWORD_MASK
 from superset.databases.schemas import encrypted_field_properties, EncryptedString
 from superset.db_engine_specs.shillelagh import ShillelaghEngineSpec
 from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
+from superset.exceptions import SupersetException
 
 if TYPE_CHECKING:
     from superset.models.core import Database
+    from superset.sql_parse import Table
 
+_logger = logging.getLogger()
+
+EXAMPLE_GSHEETS_URL = "https://docs.google.com/spreadsheets/d/1LcWZMsdCl92g7nA-D6qGRqg1T5TiHyuKJUY1u9XAnsk/edit#gid=0"
 
 SYNTAX_ERROR_REGEX = re.compile('SQLError: near "(?P<server_error>.*?)": syntax error')
 
@@ -57,7 +69,7 @@ class GSheetsParametersSchema(Schema):
 
 class GSheetsParametersType(TypedDict):
     service_account_info: str
-    catalog: Optional[dict[str, str]]
+    catalog: dict[str, str] | None
 
 
 class GSheetsPropertiesType(TypedDict):
@@ -88,14 +100,14 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
         ),
     }
 
-    supports_file_upload = False
+    supports_file_upload = True
 
     @classmethod
     def get_url_for_impersonation(
         cls,
         url: URL,
         impersonate_user: bool,
-        username: Optional[str],
+        username: str | None,
     ) -> URL:
         if impersonate_user and username is not None:
             user = security_manager.find_user(username=username)
@@ -107,9 +119,9 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
     @classmethod
     def extra_table_metadata(
         cls,
-        database: "Database",
+        database: Database,
         table_name: str,
-        schema_name: Optional[str],
+        schema_name: str | None,
     ) -> dict[str, Any]:
         with database.get_raw_connection(schema=schema_name) as conn:
             cursor = conn.cursor()
@@ -126,9 +138,8 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
     def build_sqlalchemy_uri(
         cls,
         _: GSheetsParametersType,
-        encrypted_extra: Optional[  # pylint: disable=unused-argument
-            dict[str, Any]
-        ] = None,
+        encrypted_extra: None
+        | (dict[str, Any]) = None,  # pylint: disable=unused-argument
     ) -> str:
         return "gsheets://"
 
@@ -136,7 +147,7 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
     def get_parameters_from_uri(
         cls,
         uri: str,  # pylint: disable=unused-argument
-        encrypted_extra: Optional[dict[str, Any]] = None,
+        encrypted_extra: dict[str, Any] | None = None,
     ) -> Any:
         # Building parameters from encrypted_extra and uri
         if encrypted_extra:
@@ -145,7 +156,7 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
         raise ValidationError("Invalid service credentials")
 
     @classmethod
-    def mask_encrypted_extra(cls, encrypted_extra: Optional[str]) -> Optional[str]:
+    def mask_encrypted_extra(cls, encrypted_extra: str | None) -> str | None:
         if encrypted_extra is None:
             return encrypted_extra
 
@@ -162,9 +173,7 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
         return json.dumps(config)
 
     @classmethod
-    def unmask_encrypted_extra(
-        cls, old: Optional[str], new: Optional[str]
-    ) -> Optional[str]:
+    def unmask_encrypted_extra(cls, old: str | None, new: str | None) -> str | None:
         """
         Reuse ``private_key`` if available and unchanged.
         """
@@ -299,3 +308,115 @@ class GSheetsEngineSpec(ShillelaghEngineSpec):
                 )
             idx += 1
         return errors
+
+    @staticmethod
+    def _do_post(
+        session: Session,
+        url: str,
+        body: dict[str, Any],
+        **kwargs: Any,
+    ) -> dict[str, Any]:
+        """
+        POST to the Google API.
+
+        Helper function that handles logging and error handling.
+        """
+        _logger.info("POST %s", url)
+        _logger.debug(body)
+        response = session.post(
+            url,
+            json=body,
+            **kwargs,
+        )
+
+        payload = response.json()
+        _logger.debug(payload)
+
+        if "error" in payload:
+            raise SupersetException(payload["error"]["message"])
+
+        return payload
+
+    @classmethod
+    def df_to_sql(
+        cls,
+        database: Database,
+        table: Table,
+        df: pd.DataFrame,
+        to_sql_kwargs: dict[str, Any],
+    ) -> None:
+        """
+        Create a new sheet and update the DB catalog.
+
+        Since Google Sheets is not a database, uploading a file is slightly different
+        from other traditional databases. To create a table with a given name we first
+        create a spreadsheet with the contents of the dataframe, and we later update the
+        database catalog to add a mapping between the desired table name and the URL of
+        the new sheet.
+        """
+        # grab the existing catalog, if any
+        extra = database.get_extra()
+        engine_params = extra.setdefault("engine_params", {})
+        catalog = engine_params.setdefault("catalog", {})
+
+        # sanity checks
+        spreadsheet_url = catalog.get(table.table)
+        if spreadsheet_url and "if_exists" in to_sql_kwargs:
+            if to_sql_kwargs["if_exists"] == "append":
+                # no way we're going to append a dataframe to a spreadsheet, that's
+                # never going to work
+                raise SupersetException("Append operation not currently supported")
+            if to_sql_kwargs["if_exists"] == "fail":
+                raise SupersetException("Table already exists")
+            if to_sql_kwargs["if_exists"] == "replace":
+                pass
+
+        # get the Google session from the Shillelagh adapter
+        with cls.get_engine(database) as engine:
+            with engine.connect() as conn:
+                # any GSheets URL will work to get a working session
+                adapter = get_adapter_for_table_name(
+                    conn,
+                    spreadsheet_url or EXAMPLE_GSHEETS_URL,
+                )
+                session = adapter._get_session()  # pylint: disable=protected-access
+
+        # create or replace sheet
+        if spreadsheet_url:
+            url = (
+                "https://sheets.googleapis.com/v4/spreadsheets/"
+                f"{adapter._spreadsheet_id}/values/'{adapter._sheet_name}':clear"
+            )
+            cls._do_post(session, url, {})
+        else:
+            payload = cls._do_post(
+                session,
+                "https://sheets.googleapis.com/v4/spreadsheets",
+                {"properties": {"title": table.table}},
+            )
+            spreadsheet_id = payload["spreadsheetId"]
+            spreadsheet_url = payload["spreadsheetUrl"]
+            range_ = payload["sheets"][0]["properties"]["title"]
+
+        # insert data
+        body = {
+            "range": range_,
+            "majorDimension": "ROWS",
+            "values": df.values.tolist(),
+        }
+        url = (
+            "https://sheets.googleapis.com/v4/spreadsheets/"
+            f"{spreadsheet_id}/values/{range_}:append"
+        )
+        cls._do_post(
+            session,
+            url,
+            body,
+            params={"valueInputOption": "USER_ENTERED"},
+        )
+
+        # update catalog
+        catalog[table.table] = spreadsheet_url
+        database.extra = json.dumps(extra)
+        db.session.add(database)
+        db.session.commit()