You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by ma...@apache.org on 2019/06/23 21:20:26 UTC
[incubator-superset] branch master updated: Add csv upload support
for BigQuery (#7756)
This is an automated email from the ASF dual-hosted git repository.
maximebeauchemin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git
The following commit(s) were added to refs/heads/master by this push:
new 1c4092c Add csv upload support for BigQuery (#7756)
1c4092c is described below
commit 1c4092c61cc8aeb3f3c0dabb7831813e514dfbdd
Author: Ville Brofeldt <33...@users.noreply.github.com>
AuthorDate: Mon Jun 24 00:20:09 2019 +0300
Add csv upload support for BigQuery (#7756)
* Add extra_require for bigquery to setup.py
* Refactor df_to_db and add df upload capability for BigQuery
* Fix unit tests and clarify kwarg logic
* Fix flake8 errors
* Add minimum versions for bigquery dependencies
* wrap to_gbq in try-catch block and raise error if pandas-gbq is missing
* Fix linting error and make error more generic
---
setup.py | 4 +++
superset/db_engine_specs/base.py | 49 +++++++++++++++++++++++-------------
superset/db_engine_specs/bigquery.py | 37 ++++++++++++++++++++++++---
superset/db_engine_specs/hive.py | 4 +--
tests/db_engine_specs_test.py | 6 ++---
5 files changed, 74 insertions(+), 26 deletions(-)
diff --git a/setup.py b/setup.py
index 420375b..28c6bbd 100644
--- a/setup.py
+++ b/setup.py
@@ -108,6 +108,10 @@ setup(
'wtforms-json',
],
extras_require={
+ 'bigquery': [
+ 'pybigquery>=0.4.10',
+ 'pandas_gbq>=0.10.0',
+ ],
'cors': ['flask-cors>=2.0.0'],
'hive': [
'pyhive[hive]>=0.6.1',
diff --git a/superset/db_engine_specs/base.py b/superset/db_engine_specs/base.py
index da80d86..5254bae 100644
--- a/superset/db_engine_specs/base.py
+++ b/superset/db_engine_specs/base.py
@@ -230,36 +230,45 @@ class BaseEngineSpec(object):
return parsed_query.get_query_with_new_limit(limit)
@staticmethod
- def csv_to_df(**kwargs):
+ def csv_to_df(**kwargs) -> pd.DataFrame:
+ """ Read csv into Pandas DataFrame
+ :param kwargs: params to be passed to DataFrame.read_csv
+ :return: Pandas DataFrame containing data from csv
+ """
kwargs['filepath_or_buffer'] = \
config['UPLOAD_FOLDER'] + kwargs['filepath_or_buffer']
kwargs['encoding'] = 'utf-8'
kwargs['iterator'] = True
chunks = pd.read_csv(**kwargs)
- df = pd.DataFrame()
df = pd.concat(chunk for chunk in chunks)
return df
- @staticmethod
- def df_to_db(df, table, **kwargs):
+ @classmethod
+ def df_to_sql(cls, df: pd.DataFrame, **kwargs):
+ """ Upload data from a Pandas DataFrame to a database. For
+ regular engines this calls the DataFrame.to_sql() method. Can be
+ overridden for engines that don't work well with to_sql(), e.g.
+ BigQuery.
+ :param df: Dataframe with data to be uploaded
+ :param kwargs: kwargs to be passed to to_sql() method
+ """
df.to_sql(**kwargs)
- table.user_id = g.user.id
- table.schema = kwargs['schema']
- table.fetch_metadata()
- db.session.add(table)
- db.session.commit()
- @staticmethod
- def create_table_from_csv(form, table):
- def _allowed_file(filename):
+ @classmethod
+ def create_table_from_csv(cls, form, table):
+ """ Create table (including metadata in backend) from contents of a csv.
+ :param form: Parameters defining how to process data
+ :param table: Metadata of new table to be created
+ """
+ def _allowed_file(filename: str) -> bool:
# Only allow specific file extensions as specified in the config
extension = os.path.splitext(filename)[1]
- return extension and extension[1:] in config['ALLOWED_EXTENSIONS']
+ return extension is not None and extension[1:] in config['ALLOWED_EXTENSIONS']
filename = secure_filename(form.csv_file.data.filename)
if not _allowed_file(filename):
raise Exception('Invalid file type selected')
- kwargs = {
+ csv_to_df_kwargs = {
'filepath_or_buffer': filename,
'sep': form.sep.data,
'header': form.header.data if form.header.data else 0,
@@ -273,10 +282,9 @@ class BaseEngineSpec(object):
'infer_datetime_format': form.infer_datetime_format.data,
'chunksize': 10000,
}
- df = BaseEngineSpec.csv_to_df(**kwargs)
+ df = cls.csv_to_df(**csv_to_df_kwargs)
- df_to_db_kwargs = {
- 'table': table,
+ df_to_sql_kwargs = {
'df': df,
'name': form.name.data,
'con': create_engine(form.con.data.sqlalchemy_uri_decrypted, echo=False),
@@ -286,8 +294,13 @@ class BaseEngineSpec(object):
'index_label': form.index_label.data,
'chunksize': 10000,
}
+ cls.df_to_sql(**df_to_sql_kwargs)
- BaseEngineSpec.df_to_db(**df_to_db_kwargs)
+ table.user_id = g.user.id
+ table.schema = form.schema.data
+ table.fetch_metadata()
+ db.session.add(table)
+ db.session.commit()
@classmethod
def convert_dttm(cls, target_type, dttm):
diff --git a/superset/db_engine_specs/bigquery.py b/superset/db_engine_specs/bigquery.py
index af6e066..954966f 100644
--- a/superset/db_engine_specs/bigquery.py
+++ b/superset/db_engine_specs/bigquery.py
@@ -14,10 +14,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-# pylint: disable=C,R,W
import hashlib
import re
+import pandas as pd
from sqlalchemy import literal_column
from superset.db_engine_specs.base import BaseEngineSpec
@@ -86,8 +86,8 @@ class BigQueryEngineSpec(BaseEngineSpec):
# replace non-alphanumeric characters with underscores
label_mutated = re.sub(r'[^\w]+', '_', label_mutated)
if label_mutated != label:
- # add md5 hash to label to avoid possible collisions
- label_mutated += label_hashed
+ # add first 5 chars from md5 hash to label to avoid possible collisions
+ label_mutated += label_hashed[:6]
return label_mutated
@@ -141,3 +141,34 @@ class BigQueryEngineSpec(BaseEngineSpec):
@classmethod
def epoch_ms_to_dttm(cls):
return 'TIMESTAMP_MILLIS({col})'
+
+ @classmethod
+ def df_to_sql(cls, df: pd.DataFrame, **kwargs):
+ """
+ Upload data from a Pandas DataFrame to BigQuery. Calls
+ `DataFrame.to_gbq()` which requires `pandas_gbq` to be installed.
+
+ :param df: Dataframe with data to be uploaded
+ :param kwargs: kwargs to be passed to to_gbq() method. Requires both `schema
+ and ``name` to be present in kwargs, which are combined and passed to
+ `to_gbq()` as `destination_table`.
+ """
+ try:
+ import pandas_gbq
+ except ImportError:
+ raise Exception('Could not import the library `pandas_gbq`, which is '
+ 'required to be installed in your environment in order '
+ 'to upload data to BigQuery')
+
+ if not ('name' in kwargs and 'schema' in kwargs):
+ raise Exception('name and schema need to be defined in kwargs')
+ gbq_kwargs = {}
+ gbq_kwargs['project_id'] = kwargs['con'].engine.url.host
+ gbq_kwargs['destination_table'] = f"{kwargs.pop('schema')}.{kwargs.pop('name')}"
+
+ # Only pass through supported kwargs
+ supported_kwarg_keys = {'if_exists'}
+ for key in supported_kwarg_keys:
+ if key in kwargs:
+ gbq_kwargs[key] = kwargs[key]
+ pandas_gbq.to_gbq(df, **gbq_kwargs)
diff --git a/superset/db_engine_specs/hive.py b/superset/db_engine_specs/hive.py
index 301b132..4142745 100644
--- a/superset/db_engine_specs/hive.py
+++ b/superset/db_engine_specs/hive.py
@@ -94,8 +94,8 @@ class HiveEngineSpec(PrestoEngineSpec):
except pyhive.exc.ProgrammingError:
return []
- @staticmethod
- def create_table_from_csv(form, table):
+ @classmethod
+ def create_table_from_csv(cls, form, table):
"""Uploads a csv file and creates a superset datasource in Hive."""
def convert_to_hive_type(col_type):
"""maps tableschema's types to hive types"""
diff --git a/tests/db_engine_specs_test.py b/tests/db_engine_specs_test.py
index cdc08cd..6acc8ba 100644
--- a/tests/db_engine_specs_test.py
+++ b/tests/db_engine_specs_test.py
@@ -702,15 +702,15 @@ class DbEngineSpecsTestCase(SupersetTestCase):
self.assertEqual(label, label_expected)
label = BigQueryEngineSpec.make_label_compatible(column('SUM(x)').name)
- label_expected = 'SUM_x__5f110b965a993675bc4953bb3e03c4a5'
+ label_expected = 'SUM_x__5f110'
self.assertEqual(label, label_expected)
label = BigQueryEngineSpec.make_label_compatible(column('SUM[x]').name)
- label_expected = 'SUM_x__7ebe14a3f9534aeee125449b0bc083a8'
+ label_expected = 'SUM_x__7ebe1'
self.assertEqual(label, label_expected)
label = BigQueryEngineSpec.make_label_compatible(column('12345_col').name)
- label_expected = '_12345_col_8d3906e2ea99332eb185f7f8ecb2ffd6'
+ label_expected = '_12345_col_8d390'
self.assertEqual(label, label_expected)
def test_oracle_sqla_column_name_length_exceeded(self):