You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@superset.apache.org by GitBox <gi...@apache.org> on 2019/08/02 07:10:08 UTC

[GitHub] [incubator-superset] syafiqdante opened a new issue #7972: Celery throws error when using ctas

syafiqdante opened a new issue #7972: Celery throws error when using ctas 
URL: https://github.com/apache/incubator-superset/issues/7972
 
 
   Make sure these boxes are checked before submitting your issue - thank you!
   
     I have checked the superset logs for python stacktraces and included it here as text if there are any.
     I have reproduced the issue with at least the latest released version of superset.
     I have checked the issue tracker for the same issue and I haven't found one similar.
   Superset version
   0.28.1
   
   Expected results:
   CTAS function should create new table
   
   Actual results:
   Celery worker throws error:
   ```
   [2019-08-02 09:47:00,417: ERROR/ForkPoolWorker-1] Query with id `202` could not be retrieved
   [2019-08-02 09:47:00,419: ERROR/ForkPoolWorker-1] Sleeping for a sec before retrying...
   [2019-08-02 09:47:01,424: ERROR/ForkPoolWorker-1] Query with id `202` could not be retrieved
   [2019-08-02 09:47:01,424: ERROR/ForkPoolWorker-1] Sleeping for a sec before retrying...
   [2019-08-02 09:47:02,429: ERROR/ForkPoolWorker-1] Query with id `202` could not be retrieved
   [2019-08-02 09:47:02,429: ERROR/ForkPoolWorker-1] Sleeping for a sec before retrying...
   [2019-08-02 09:47:03,433: ERROR/ForkPoolWorker-1] Query with id `202` could not be retrieved
   [2019-08-02 09:47:03,434: ERROR/ForkPoolWorker-1] Sleeping for a sec before retrying...
   [2019-08-02 09:47:04,440: ERROR/ForkPoolWorker-1] Query with id `202` could not be retrieved
   [2019-08-02 09:47:04,440: ERROR/ForkPoolWorker-1] Sleeping for a sec before retrying...
   [2019-08-02 09:47:05,442: ERROR/ForkPoolWorker-1] Failed at getting query
   Traceback (most recent call last):
     File "/root/incubator-superset/superset/sql_lab.py", line 136, in get_sql_results
       session=session, start_time=start_time)
     File "/root/incubator-superset/superset/sql_lab.py", line 224, in execute_sql_statements
       query = get_query(query_id, session)
     File "/root/incubator-superset/superset/sql_lab.py", line 96, in get_query
       raise SqlLabException('Failed at getting query')
   superset.sql_lab.SqlLabException: Failed at getting query
   [2019-08-02 09:47:05,455: ERROR/ForkPoolWorker-1] Query with id `202` could not be retrieved
   [2019-08-02 09:47:05,456: ERROR/ForkPoolWorker-1] Sleeping for a sec before retrying...
   [2019-08-02 09:47:06,461: ERROR/ForkPoolWorker-1] Query with id `202` could not be retrieved
   [2019-08-02 09:47:06,462: ERROR/ForkPoolWorker-1] Sleeping for a sec before retrying...
   [2019-08-02 09:47:07,467: ERROR/ForkPoolWorker-1] Query with id `202` could not be retrieved
   [2019-08-02 09:47:07,467: ERROR/ForkPoolWorker-1] Sleeping for a sec before retrying...
   [2019-08-02 09:47:08,472: ERROR/ForkPoolWorker-1] Query with id `202` could not be retrieved
   [2019-08-02 09:47:08,473: ERROR/ForkPoolWorker-1] Sleeping for a sec before retrying...
   [2019-08-02 09:47:09,477: ERROR/ForkPoolWorker-1] Query with id `202` could not be retrieved
   [2019-08-02 09:47:09,478: ERROR/ForkPoolWorker-1] Sleeping for a sec before retrying...
   [2019-08-02 09:47:10,482: ERROR/ForkPoolWorker-1] Failed at getting query
   Traceback (most recent call last):
     File "/root/incubator-superset/superset/sql_lab.py", line 136, in get_sql_results
       session=session, start_time=start_time)
     File "/root/incubator-superset/superset/sql_lab.py", line 224, in execute_sql_statements
       query = get_query(query_id, session)
     File "/root/incubator-superset/superset/sql_lab.py", line 96, in get_query
       raise SqlLabException('Failed at getting query')
   superset.sql_lab.SqlLabException: Failed at getting query
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/root/incubator-superset/superset/sql_lab.py", line 114, in session_scope
       yield session
     File "/root/incubator-superset/superset/sql_lab.py", line 140, in get_sql_results
       query = get_query(query_id, session)
     File "/root/incubator-superset/superset/sql_lab.py", line 96, in get_query
       raise SqlLabException('Failed at getting query')
   superset.sql_lab.SqlLabException: Failed at getting query
   [2019-08-02 09:47:10,504: ERROR/ForkPoolWorker-1] Task superset.sql_lab.get_sql_results[cb5c14c2-0c78-4ee2-8e00-b8c68f559f0e] raised unexpected: SqlLabException('Failed at getting query',)
   Traceback (most recent call last):
     File "/root/incubator-superset/superset/sql_lab.py", line 136, in get_sql_results
       session=session, start_time=start_time)
     File "/root/incubator-superset/superset/sql_lab.py", line 224, in execute_sql_statements
       query = get_query(query_id, session)
     File "/root/incubator-superset/superset/sql_lab.py", line 96, in get_query
       raise SqlLabException('Failed at getting query')
   superset.sql_lab.SqlLabException: Failed at getting query
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/usr/local/lib/python3.6/dist-packages/celery/app/trace.py", line 385, in trace_task
       R = retval = fun(*args, **kwargs)
     File "/usr/local/lib/python3.6/dist-packages/celery/app/trace.py", line 648, in __protected_call__
       return self.run(*args, **kwargs)
     File "/root/incubator-superset/superset/sql_lab.py", line 140, in get_sql_results
       query = get_query(query_id, session)
     File "/root/incubator-superset/superset/sql_lab.py", line 96, in get_query
       raise SqlLabException('Failed at getting query')
   superset.sql_lab.SqlLabException: Failed at getting query
   ```
   
   my superset_config file:
   ```
   from collections import OrderedDict
   import imp
   import json
   import os
   import sys
   
   from celery.schedules import crontab
   from dateutil import tz
   from flask_appbuilder.security.manager import AUTH_DB
   from werkzeug.contrib.cache import RedisCache
   
   from superset.stats_logger import StatsdStatsLogger
   STATS_LOGGER = StatsdStatsLogger(host='localhost', port=8125, prefix='superset')
   
   BASE_DIR = os.path.abspath(os.path.dirname(__file__))
   if "SUPERSET_HOME" in os.environ:
       DATA_DIR = os.environ["SUPERSET_HOME"]
   else:
       DATA_DIR = os.path.join(os.path.expanduser("~"), ".superset")
   
   PACKAGE_DIR = os.path.join(BASE_DIR) #, "static", "assets")
   PACKAGE_FILE = os.path.join(PACKAGE_DIR, "package.json")
   with open(PACKAGE_FILE) as package_file:
       VERSION_STRING = json.load(package_file)["version"]
   
   ROW_LIMIT = 50000
   VIZ_ROW_LIMIT = 10000
   FILTER_SELECT_ROW_LIMIT = 10000
   SUPERSET_WORKERS = 4  # deprecated
   SUPERSET_CELERY_WORKERS = 32  # deprecated
   
   SUPERSET_WEBSERVER_ADDRESS = "0.0.0.0"
   SUPERSET_WEBSERVER_PORT = 8088
   SUPERSET_WEBSERVER_TIMEOUT = 60
   SUPERSET_DASHBOARD_POSITION_DATA_LIMIT = 65535
   EMAIL_NOTIFICATIONS = False
   CUSTOM_SECURITY_MANAGER = None
   SQLALCHEMY_TRACK_MODIFICATIONS = False
   
   SECRET_KEY = "\2\1thisismyscretkey\1\2\e\y\y\h"  # noqa
   
   SQLALCHEMY_DATABASE_URI = 'sqlite:////root/.superset/superset.db'
   
   QUERY_SEARCH_LIMIT = 1000
   
   WTF_CSRF_ENABLED = True
   WTF_CSRF_EXEMPT_LIST = ["superset.views.core.log"]
   DEBUG = os.environ.get("FLASK_ENV") == "development"
   FLASK_USE_RELOAD = True
   
   SHOW_STACKTRACE = True
   ENABLE_PROXY_FIX = False
   APP_NAME = "Superset"
   
   APP_ICON = "/static/assets/images/superset-logo@2x.png"
   APP_ICON_WIDTH = 126
   LOGO_TARGET_PATH = None
   
   DRUID_IS_ACTIVE = True
   DRUID_TZ = tz.tzutc()
   DRUID_ANALYSIS_TYPES = ["cardinality"]
   
   AUTH_TYPE = AUTH_DB
   PUBLIC_ROLE_LIKE_GAMMA = False
   BABEL_DEFAULT_LOCALE = "en"
   BABEL_DEFAULT_FOLDER = "superset/translations"
   LANGUAGES = {
       "en": {"flag": "us", "name": "English"},
       "it": {"flag": "it", "name": "Italian"},
       "fr": {"flag": "fr", "name": "French"},
       "zh": {"flag": "cn", "name": "Chinese"},
       "ja": {"flag": "jp", "name": "Japanese"},
       "de": {"flag": "de", "name": "German"},
       "pt": {"flag": "pt", "name": "Portuguese"},
       "pt_BR": {"flag": "br", "name": "Brazilian Portuguese"},
       "ru": {"flag": "ru", "name": "Russian"},
       "ko": {"flag": "kr", "name": "Korean"},
   }
   
   DEFAULT_FEATURE_FLAGS = {
       "CLIENT_CACHE": False
   }
   
   GET_FEATURE_FLAGS_FUNC = None
   UPLOAD_FOLDER = BASE_DIR + "/app/static/uploads/"
   IMG_UPLOAD_FOLDER = BASE_DIR + "/app/static/uploads/"
   IMG_UPLOAD_URL = "/static/uploads/"
   
   CACHE_DEFAULT_TIMEOUT = 60 * 60 * 24
   
   CACHE_CONFIG = {
       'CACHE_TYPE': 'redis',
       'CACHE_DEFAULT_TIMEOUT': 60 * 60 * 24, # 1 day default (in secs)
       'CACHE_KEY_PREFIX': 'superset_results',
       'CACHE_REDIS_HOST': 'localhost',
       'CACHE_REDIS_PORT': 6379,
       'CACHE_REDIS_DB': 1,
       'CACHE_REDIS_URL': 'redis://localhost:6379/1',
   }
   TABLE_NAMES_CACHE_CONFIG = {"CACHE_TYPE": "null"}
   ENABLE_CORS = False
   CORS_OPTIONS = {}
   SUPERSET_WEBSERVER_DOMAINS = None
   ALLOWED_EXTENSIONS = set(["csv"])
   CSV_EXPORT = {"encoding": "utf-8"}
   
   TIME_GRAIN_BLACKLIST = []
   TIME_GRAIN_ADDONS = {}
   TIME_GRAIN_ADDON_FUNCTIONS = {}
   VIZ_TYPE_BLACKLIST = []
   DRUID_DATA_SOURCE_BLACKLIST = []
   DEFAULT_MODULE_DS_MAP = OrderedDict(
       [
           ("superset.connectors.sqla.models", ["SqlaTable"]),
           ("superset.connectors.druid.models", ["DruidDatasource"]),
       ]
   )
   
   LOG_FORMAT = "%(asctime)s:%(levelname)s:%(name)s:%(message)s"
   LOG_LEVEL = "DEBUG"
   
   ENABLE_TIME_ROTATE = False
   TIME_ROTATE_LOG_LEVEL = "DEBUG"
   FILENAME = os.path.join(DATA_DIR, "superset.log")
   ROLLOVER = "midnight"
   INTERVAL = 1
   BACKUP_COUNT = 30
   
   MAPBOX_API_KEY = os.environ.get("MAPBOX_API_KEY", "")
   SQL_MAX_ROW = 100000
   DISPLAY_MAX_ROW = 10000
   DEFAULT_SQLLAB_LIMIT = 1000
   MAX_TABLE_NAMES = 3000
   SQLLAB_SAVE_WARNING_MESSAGE = None
   SQLLAB_SCHEDULE_WARNING_MESSAGE = None
   
   
   #taken from superset installation page
   class CeleryConfig(object):
       BROKER_URL = 'redis://localhost:6379/0'
       CELERY_IMPORTS = (
           'superset.sql_lab',
           'superset.tasks',
       )
       CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
       CELERYD_LOG_LEVEL = 'DEBUG'
       CELERYD_PREFETCH_MULTIPLIER = 10
       CELERY_ACKS_LATE = True
       CELERY_ANNOTATIONS = {
           'superset.sql_lab.get_sql_results': { #added 'superset.'
               'rate_limit': '100/s',
           },
           'tasks.add': {
               'rate_limit': '10/s'
           },
   
           'email_reports.send': {
               'rate_limit': '1/s',
               'time_limit': 120,
               'soft_time_limit': 150,
               'ignore_result': True,
           },
       }
   
       CELERYBEAT_SCHEDULE = {
           "email_reports.schedule_hourly": {
               "task": "email_reports.schedule_hourly",
               "schedule": crontab(minute=1, hour="*"),
           },
       }
       CELERYBEAT_SCHEDULE = {
           'cache-warmup-hourly': {
               'task': 'cache-warmup',
               'schedule': crontab(minute=0, hour='*'),  # hourly
               'kwargs': {
                   'strategy_name': 'top_n_dashboards',
                   'top_n': 5,
                   'since': '7 days ago',
               },
           },
       }
   CELERY_CONFIG = CeleryConfig
   
   HTTP_HEADERS = {}
   DEFAULT_DB_ID = None
   SQLLAB_TIMEOUT = 30
   SQLLAB_VALIDATION_TIMEOUT = 10
   SQLLAB_DEFAULT_DBID = None
   SQLLAB_ASYNC_TIME_LIMIT_SEC = 60 * 60 * 6
   
   RESULTS_BACKEND = RedisCache(
       host='localhost',
       port=6379,
       key_prefix='superset_results'
   )
   
   CSV_TO_HIVE_UPLOAD_S3_BUCKET = None
   CSV_TO_HIVE_UPLOAD_DIRECTORY = "EXTERNAL_HIVE_TABLES/"
   UPLOADED_CSV_HIVE_NAMESPACE = None
   JINJA_CONTEXT_ADDONS = {}
   ROBOT_PERMISSION_ROLES = ["Public", "Gamma", "Alpha", "Admin", "sql_lab"]
   CONFIG_PATH_ENV_VAR = "SUPERSET_CONFIG_PATH"
   FLASK_APP_MUTATOR = None
   ENABLE_ACCESS_REQUEST = False
   
   EMAIL_NOTIFICATIONS = False  # all the emails are sent using dryrun
   SMTP_HOST = "localhost"
   SMTP_STARTTLS = True
   SMTP_SSL = False
   SMTP_USER = "superset"
   SMTP_PORT = 25
   SMTP_PASSWORD = "superset"
   SMTP_MAIL_FROM = "superset@superset.com"
   
   if not CACHE_DEFAULT_TIMEOUT:
       CACHE_DEFAULT_TIMEOUT = CACHE_CONFIG.get("CACHE_DEFAULT_TIMEOUT")
   
   SILENCE_FAB = True
   TROUBLESHOOTING_LINK = ""
   WTF_CSRF_TIME_LIMIT = 60 * 60 * 24 * 7
   PERMISSION_INSTRUCTIONS_LINK = ""
   BLUEPRINTS = []
   TRACKING_URL_TRANSFORMER = lambda x: x  # noqa: E731
   HIVE_POLL_INTERVAL = 5
   
   ENABLE_JAVASCRIPT_CONTROLS = False
   DASHBOARD_TEMPLATE_ID = None
   DB_CONNECTION_MUTATOR = None
   SQL_QUERY_MUTATOR = None
   ENABLE_FLASK_COMPRESS = True
   ENABLE_SCHEDULED_EMAIL_REPORTS = False
   SCHEDULED_EMAIL_DEBUG_MODE = False
   EMAIL_REPORTS_CRON_RESOLUTION = 15
   EMAIL_REPORT_FROM_ADDRESS = "reports@superset.org"
   EMAIL_REPORT_BCC_ADDRESS = None
   EMAIL_REPORTS_USER = "admin"
   EMAIL_REPORTS_SUBJECT_PREFIX = "[Report] "
   EMAIL_REPORTS_WEBDRIVER = "firefox"
   
   WEBDRIVER_WINDOW = {"dashboard": (1600, 2000), "slice": (3000, 1200)}
   WEBDRIVER_CONFIGURATION = {}
   WEBDRIVER_BASEURL = "http://0.0.0.0:8080/"
   
   BUG_REPORT_URL = None
   DOCUMENTATION_URL = None
   
   DEFAULT_RELATIVE_START_TIME = "today"
   DEFAULT_RELATIVE_END_TIME = "today"
   
   SQL_VALIDATORS_BY_ENGINE = {"presto": "PrestoDBSQLValidator"}
   
   TALISMAN_ENABLED = False
   TALISMAN_CONFIG = {
       "content_security_policy": None,
       "force_https": True,
       "force_https_permanent": False,
   }
   
   
   SQLALCHEMY_EXAMPLES_URI = None
   
   try:
       if CONFIG_PATH_ENV_VAR in os.environ:
   
           print(
               "Loaded your LOCAL configuration at [{}]".format(
                   os.environ[CONFIG_PATH_ENV_VAR]
               )
           )
           module = sys.modules[__name__]
           override_conf = imp.load_source(
               "superset_config", os.environ[CONFIG_PATH_ENV_VAR]
           )
           for key in dir(override_conf):
               if key.isupper():
                   setattr(module, key, getattr(override_conf, key))
   
       else:
           from superset_config import *  # noqa
           import superset_config
   
           print(
               "Loaded your LOCAL configuration at [{}]".format(superset_config.__file__)
           )
   except ImportError:
       pass
   ```
   
   sql_lab.py file:
   ```
   from contextlib import closing
   from datetime import datetime
   import logging
   from time import sleep
   import uuid
   from celery.exceptions import SoftTimeLimitExceeded
   from contextlib2 import contextmanager
   from flask_babel import lazy_gettext as _
   import simplejson as json
   import sqlalchemy
   from sqlalchemy.orm import sessionmaker
   from sqlalchemy.pool import NullPool
   from superset import app, dataframe, db, results_backend, security_manager
   from superset.models.sql_lab import Query
   from superset.sql_parse import ParsedQuery
   from superset.tasks.celery_app import app as celery_app
   from superset.utils.core import (
       json_iso_dttm_ser,
       QueryStatus,
       sources,
       zlib_compress,
   )
   from superset.utils.dates import now_as_float
   from superset.utils.decorators import stats_timing
   
   config = app.config
   stats_logger = config.get('STATS_LOGGER')
   SQLLAB_TIMEOUT = config.get('SQLLAB_ASYNC_TIME_LIMIT_SEC', 600)
   log_query = config.get('QUERY_LOGGER')
   
   class SqlLabException(Exception):
       pass
   
   class SqlLabSecurityException(SqlLabException):
       pass
   
   class SqlLabTimeoutException(SqlLabException):
       pass
   
   def handle_query_error(msg, query, session, payload=None):
       """Local method handling error while processing the SQL"""
       payload = payload or {}
       troubleshooting_link = config['TROUBLESHOOTING_LINK']
       query.error_message = msg
       query.status = QueryStatus.FAILED
       query.tmp_table_name = None
       session.commit()
       payload.update({
           'status': query.status,
           'error': msg,
       })
       if troubleshooting_link:
           payload['link'] = troubleshooting_link
       return payload
   
   
   def get_query(query_id, session, retry_count=5):
       """attemps to get the query and retry if it cannot"""
       query = None
       attempt = 0
       while not query and attempt < retry_count:
           try:
               query = session.query(Query).filter_by(id=query_id).one()
           except Exception:
               attempt += 1
               logging.error(
                   'Query with id `{}` could not be retrieved'.format(query_id))
               stats_logger.incr('error_attempting_orm_query_' + str(attempt))
               logging.error('Sleeping for a sec before retrying...')
               sleep(1)
       if not query:
           stats_logger.incr('error_failed_at_getting_orm_query')
           raise SqlLabException('Failed at getting query')
       return query
   
   @contextmanager
   def session_scope(nullpool):
       """Provide a transactional scope around a series of operations."""
       if nullpool:
           engine = sqlalchemy.create_engine(
               app.config.get('SQLALCHEMY_DATABASE_URI'), poolclass=NullPool)
           session_class = sessionmaker()
           session_class.configure(bind=engine)
           session = session_class()
       else:
           session = db.session()
           session.commit()  # HACK
   
       try:
           yield session
           session.commit()
       except Exception as e:
           session.rollback()
           logging.exception(e)
           raise
       finally:
           session.close()
   
   
   @celery_app.task(name='superset.sql_lab.get_sql_results', #added 'superset.'
                    bind=True,
                    soft_time_limit=SQLLAB_TIMEOUT)
   def get_sql_results(
       ctask, query_id, rendered_query, return_results=True, store_results=False,
           user_name=None, start_time=None):
       """Executes the sql query returns the results."""
       with session_scope(not ctask.request.called_directly) as session:
   
           try:
               return execute_sql_statements(
                   ctask, query_id, rendered_query, return_results, store_results, user_name,
                   session=session, start_time=start_time)
           except Exception as e:
               logging.exception(e)
               stats_logger.incr('error_sqllab_unhandled')
               query = get_query(query_id, session)
               return handle_query_error(str(e), query, session)
   
   
   def execute_sql_statement(
           sql_statement, query, user_name, session,
           cursor, return_results=False):
       """Executes a single SQL statement"""
       database = query.database
       db_engine_spec = database.db_engine_spec
       parsed_query = ParsedQuery(sql_statement)
       sql = parsed_query.stripped()
       SQL_MAX_ROWS = app.config.get('SQL_MAX_ROW')
   
       if not parsed_query.is_readonly() and not database.allow_dml:
           raise SqlLabSecurityException(
               _('Only `SELECT` statements are allowed against this database'))
       if query.select_as_cta:
           if not parsed_query.is_select():
               raise SqlLabException(_(
                   'Only `SELECT` statements can be used with the CREATE TABLE '
                   'feature.'))
           if not query.tmp_table_name:
               start_dttm = datetime.fromtimestamp(query.start_time)
               query.tmp_table_name = 'tmp_{}_table_{}'.format(
                   query.user_id, start_dttm.strftime('%Y_%m_%d_%H_%M_%S'))
           sql = parsed_query.as_create_table(query.tmp_table_name)
           query.select_as_cta_used = True
       if parsed_query.is_select():
           if SQL_MAX_ROWS and (not query.limit or query.limit > SQL_MAX_ROWS):
               query.limit = SQL_MAX_ROWS
           if query.limit:
               sql = database.apply_limit_to_sql(sql, query.limit)
   
       # Hook to allow environment-specific mutation (usually comments) to the SQL
       SQL_QUERY_MUTATOR = config.get('SQL_QUERY_MUTATOR')
       if SQL_QUERY_MUTATOR:
           sql = SQL_QUERY_MUTATOR(sql, user_name, security_manager, database)
   
       try:
           if log_query:
               log_query(
                   query.database.sqlalchemy_uri,
                   query.executed_sql,
                   query.schema,
                   user_name,
                   __name__,
                   security_manager,
               )
           query.executed_sql = sql
           with stats_timing('sqllab.query.time_executing_query', stats_logger):
               logging.info('Running query: \n{}'.format(sql))
               db_engine_spec.execute(cursor, sql, async_=True)
               logging.info('Handling cursor')
               db_engine_spec.handle_cursor(cursor, query, session)
   
           with stats_timing('sqllab.query.time_fetching_results', stats_logger):
               logging.debug('Fetching data for query object: {}'.format(query.to_dict()))
               data = db_engine_spec.fetch_data(cursor, query.limit)
   
       except SoftTimeLimitExceeded as e:
           logging.exception(e)
           raise SqlLabTimeoutException(
               "SQL Lab timeout. This environment's policy is to kill queries "
               'after {} seconds.'.format(SQLLAB_TIMEOUT))
       except Exception as e:
           logging.exception(e)
           raise SqlLabException(db_engine_spec.extract_error_message(e))
   
       logging.debug('Fetching cursor description')
       cursor_description = cursor.description
       return dataframe.SupersetDataFrame(data, cursor_description, db_engine_spec)
   
   
   def execute_sql_statements(
       ctask, query_id, rendered_query, return_results=True, store_results=False,
       user_name=None, session=None, start_time=None,
   ):
       """Executes the sql query returns the results."""
       if store_results and start_time:
           # only asynchronous queries
           stats_logger.timing(
               'sqllab.query.time_pending', now_as_float() - start_time)
   
       query = get_query(query_id, session)
       payload = dict(query_id=query_id)
       database = query.database
       db_engine_spec = database.db_engine_spec
       db_engine_spec.patch()
   
       if store_results and not results_backend:
           raise SqlLabException("Results backend isn't configured.")
   
       # Breaking down into multiple statements
       parsed_query = ParsedQuery(rendered_query)
       statements = parsed_query.get_statements()
       logging.info(f'Executing {len(statements)} statement(s)')
   
       logging.info("Set query to 'running'")
       query.status = QueryStatus.RUNNING
       query.start_running_time = now_as_float()
   
       engine = database.get_sqla_engine(
           schema=query.schema,
           nullpool=True,
           user_name=user_name,
           source=sources.get('sql_lab', None),
       )
   
       with closing(engine.raw_connection()) as conn:
           with closing(conn.cursor()) as cursor:
               statement_count = len(statements)
               for i, statement in enumerate(statements):
                   # TODO CHECK IF STOPPED
                   msg = f'Running statement {i+1} out of {statement_count}'
                   logging.info(msg)
                   query.set_extra_json_key('progress', msg)
                   session.commit()
                   is_last_statement = i == len(statements) - 1
                   try:
                       cdf = execute_sql_statement(
                           statement, query, user_name, session, cursor,
                           return_results=is_last_statement and return_results)
                       msg = f'Running statement {i+1} out of {statement_count}'
                   except Exception as e:
                       msg = str(e)
                       if statement_count > 1:
                           msg = f'[Statement {i+1} out of {statement_count}] ' + msg
                       payload = handle_query_error(msg, query, session, payload)
                       return payload
   
       # Success, updating the query entry in database
       query.rows = cdf.size
       query.progress = 100
       query.set_extra_json_key('progress', None)
       query.status = QueryStatus.SUCCESS
       if query.select_as_cta:
           query.select_sql = database.select_star(
               query.tmp_table_name,
               limit=query.limit,
               schema=database.force_ctas_schema,
               show_cols=False,
               latest_partition=False)
       query.end_time = now_as_float()
       session.commit()
   
       payload.update({
           'status': query.status,
           'data': cdf.data if cdf.data else [],
           'columns': cdf.columns if cdf.columns else [],
           'query': query.to_dict(),
       })
   
       if store_results:
           key = str(uuid.uuid4())
           logging.info(f'Storing results in results backend, key: {key}')
           with stats_timing('sqllab.query.results_backend_write', stats_logger):
               json_payload = json.dumps(
                   payload, default=json_iso_dttm_ser, ignore_nan=True)
               cache_timeout = database.cache_timeout
               if cache_timeout is None:
                   cache_timeout = config.get('CACHE_DEFAULT_TIMEOUT', 0)
               results_backend.set(key, zlib_compress(json_payload), cache_timeout)
           query.results_key = key
       session.commit()
   
       if return_results:
           return payload
   ```
   Steps to reproduce:
   1- I run command as per installation tutorial: https://superset.incubator.apache.org/installation.html
   2- run command: 
   ```
   gunicorn \
         -w 10 \
         -k gevent \
         --timeout 120 \
         -b  0.0.0.0:8088 \
         --limit-request-line 0 \
         --limit-request-field_size 0 \
         --statsd-host localhost:8125 \
         --statsd-prefix=service.superset\
         superset:app
   
   ```
   3- run celery command:
   `celery worker --app=superset.tasks.celery_app:app --pool=prefork -Ofair -c 4
   `
   3- create query and press CTAS button
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@superset.apache.org
For additional commands, e-mail: notifications-help@superset.apache.org