You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by ti...@apache.org on 2018/06/25 23:12:08 UTC

[incubator-superset] branch master updated: add more precise types to hive table from csv (#5267)

This is an automated email from the ASF dual-hosted git repository.

timi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git


The following commit(s) were added to refs/heads/master by this push:
     new b0eee12  add more precise types to hive table from csv (#5267)
b0eee12 is described below

commit b0eee129e9d6bfb3582e5a1c060dbe142b3604b5
Author: timifasubaa <30...@users.noreply.github.com>
AuthorDate: Mon Jun 25 16:12:01 2018 -0700

    add more precise types to hive table from csv (#5267)
---
 requirements.txt            |  1 +
 setup.py                    |  1 +
 superset/db_engine_specs.py | 38 +++++++++++++++++++++++++-------------
 3 files changed, 27 insertions(+), 13 deletions(-)

diff --git a/requirements.txt b/requirements.txt
index f60b57e..aaf97d1 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -34,6 +34,7 @@ six==1.11.0
 sqlalchemy==1.2.2
 sqlalchemy-utils==0.32.21
 sqlparse==0.2.4
+tableschema==1.1.0
 thrift==0.11.0
 thrift-sasl==0.3.0
 unicodecsv==0.14.1
diff --git a/setup.py b/setup.py
index 7adccb7..c5aa4b4 100644
--- a/setup.py
+++ b/setup.py
@@ -90,6 +90,7 @@ setup(
         'sqlalchemy',
         'sqlalchemy-utils',
         'sqlparse',
+        'tableschema',
         'thrift>=0.9.3',
         'thrift-sasl>=0.2.1',
         'unicodecsv',
diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py
index d6a9144..4f6b22e 100644
--- a/superset/db_engine_specs.py
+++ b/superset/db_engine_specs.py
@@ -37,7 +37,7 @@ from sqlalchemy.engine.url import make_url
 from sqlalchemy.sql import text
 from sqlalchemy.sql.expression import TextAsFrom
 import sqlparse
-import unicodecsv
+from tableschema import Table
 from werkzeug.utils import secure_filename
 
 from superset import app, cache_util, conf, db, utils
@@ -134,7 +134,7 @@ class BaseEngineSpec(object):
     @staticmethod
     def csv_to_df(**kwargs):
         kwargs['filepath_or_buffer'] = \
-            app.config['UPLOAD_FOLDER'] + kwargs['filepath_or_buffer']
+            config['UPLOAD_FOLDER'] + kwargs['filepath_or_buffer']
         kwargs['encoding'] = 'utf-8'
         kwargs['iterator'] = True
         chunks = pandas.read_csv(**kwargs)
@@ -156,7 +156,7 @@ class BaseEngineSpec(object):
         def _allowed_file(filename):
             # Only allow specific file extensions as specified in the config
             extension = os.path.splitext(filename)[1]
-            return extension and extension[1:] in app.config['ALLOWED_EXTENSIONS']
+            return extension and extension[1:] in config['ALLOWED_EXTENSIONS']
 
         filename = secure_filename(form.csv_file.data.filename)
         if not _allowed_file(filename):
@@ -973,9 +973,15 @@ class HiveEngineSpec(PrestoEngineSpec):
     @staticmethod
     def create_table_from_csv(form, table):
         """Uploads a csv file and creates a superset datasource in Hive."""
-        def get_column_names(filepath):
-            with open(filepath, 'rb') as f:
-                return next(unicodecsv.reader(f, encoding='utf-8-sig'))
+        def convert_to_hive_type(col_type):
+            """maps tableschema's types to hive types"""
+            tableschema_to_hive_types = {
+                'boolean': 'BOOLEAN',
+                'integer': 'INT',
+                'number': 'DOUBLE',
+                'string': 'STRING',
+            }
+            return tableschema_to_hive_types.get(col_type, 'STRING')
 
         table_name = form.name.data
         if config.get('UPLOADED_CSV_HIVE_NAMESPACE'):
@@ -988,21 +994,27 @@ class HiveEngineSpec(PrestoEngineSpec):
                 config.get('UPLOADED_CSV_HIVE_NAMESPACE'), table_name)
         filename = form.csv_file.data.filename
 
-        bucket_path = app.config['CSV_TO_HIVE_UPLOAD_S3_BUCKET']
+        bucket_path = config['CSV_TO_HIVE_UPLOAD_S3_BUCKET']
 
         if not bucket_path:
             logging.info('No upload bucket specified')
             raise Exception(
                 'No upload bucket specified. You can specify one in the config file.')
 
-        upload_prefix = app.config['CSV_TO_HIVE_UPLOAD_DIRECTORY']
-        dest_path = os.path.join(table_name, filename)
+        table_name = form.name.data
+        filename = form.csv_file.data.filename
+        upload_prefix = config['CSV_TO_HIVE_UPLOAD_DIRECTORY']
 
-        upload_path = app.config['UPLOAD_FOLDER'] + \
+        upload_path = config['UPLOAD_FOLDER'] + \
             secure_filename(form.csv_file.data.filename)
-        column_names = get_column_names(upload_path)
-        schema_definition = ', '.join(
-            [s + ' STRING ' for s in column_names])
+
+        hive_table_schema = Table(upload_path).infer()
+        column_name_and_type = []
+        for column_info in hive_table_schema['fields']:
+            column_name_and_type.append(
+                '{} {}'.format(
+                    column_info['name'], convert_to_hive_type(column_info['type'])))
+        schema_definition = ', '.join(column_name_and_type)
 
         s3 = boto3.client('s3')
         location = os.path.join('s3a://', bucket_path, upload_prefix, table_name)