You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ash Berlin-Taylor (Jira)" <ji...@apache.org> on 2019/12/11 13:06:00 UTC

[jira] [Closed] (AIRFLOW-6202) sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback()

     [ https://issues.apache.org/jira/browse/AIRFLOW-6202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ash Berlin-Taylor closed AIRFLOW-6202.
--------------------------------------
    Resolution: Cannot Reproduce

1.9.0 is almost two years old and doesn't receive any more fixes. Please try with 1.10.5 and re-open this issue if it is still a problem.

> sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback()
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-6202
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6202
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: database
>    Affects Versions: 1.9.0
>         Environment: airflow1.9 
>            Reporter: yangming
>            Priority: Major
>
> Our airflow1.9 is deployed in docker with conda(python2.7) and the excutor model is celery and the database is AWS rds mysql 5.7. We got and error:
> [2019-12-09 02:23:40,831] \{base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
> [2019-12-09 02:23:40,832] \{base_task_runner.py:98} INFO - Subtask: File "/opt/conda/bin/airflow", line 7, in <module>
> [2019-12-09 02:23:40,832] \{base_task_runner.py:98} INFO - Subtask: exec(compile(f.read(), _file_, 'exec'))
> [2019-12-09 02:23:40,832] \{base_task_runner.py:98} INFO - Subtask: File "/src/airflow/airflow/bin/airflow", line 27, in <module>
> [2019-12-09 02:23:40,833] \{base_task_runner.py:98} INFO - Subtask: args.func(args)
> [2019-12-09 02:23:40,833] \{base_task_runner.py:98} INFO - Subtask: File "/src/airflow/airflow/bin/cli.py", line 392, in run
> [2019-12-09 02:23:40,835] \{base_task_runner.py:98} INFO - Subtask: pool=args.pool,
> [2019-12-09 02:23:40,835] \{base_task_runner.py:98} INFO - Subtask: File "/src/airflow/airflow/utils/db.py", line 50, in wrapper
> [2019-12-09 02:23:40,836] \{base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs)
> [2019-12-09 02:23:40,836] \{base_task_runner.py:98} INFO - Subtask: File "/src/airflow/airflow/models.py", line 1533, in _run_raw_task
> [2019-12-09 02:23:40,840] \{base_task_runner.py:98} INFO - Subtask: self.handle_failure(e, test_mode, context)
> [2019-12-09 02:23:40,840] \{base_task_runner.py:98} INFO - Subtask: File "/src/airflow/airflow/models.py", line 1642, in handle_failure
> [2019-12-09 02:23:40,840] \{base_task_runner.py:98} INFO - Subtask: session.merge(self)
> [2019-12-09 02:23:40,841] \{base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2093, in merge
> [2019-12-09 02:23:40,841] \{base_task_runner.py:98} INFO - Subtask: _resolve_conflict_map=_resolve_conflict_map,
> [2019-12-09 02:23:40,841] \{base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2166, in _merge
> [2019-12-09 02:23:40,842] \{base_task_runner.py:98} INFO - Subtask: merged = self.query(mapper.class_).get(key[1])
> [2019-12-09 02:23:40,842] \{base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 959, in get
> [2019-12-09 02:23:40,843] \{base_task_runner.py:98} INFO - Subtask: return self._get_impl(ident, loading.load_on_pk_identity)
> [2019-12-09 02:23:40,843] \{base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 1069, in _get_impl
> [2019-12-09 02:23:40,844] \{base_task_runner.py:98} INFO - Subtask: return db_load_fn(self, primary_key_identity)
> [2019-12-09 02:23:40,844] \{base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 282, in load_on_pk_identity
> [2019-12-09 02:23:40,845] \{base_task_runner.py:98} INFO - Subtask: return q.one()
> [2019-12-09 02:23:40,845] \{base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 3282, in one
> [2019-12-09 02:23:40,846] \{base_task_runner.py:98} INFO - Subtask: ret = self.one_or_none()
> [2019-12-09 02:23:40,846] \{base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 3251, in one_or_none
> [2019-12-09 02:23:40,846] \{base_task_runner.py:98} INFO - Subtask: ret = list(self)
> [2019-12-09 02:23:40,847] \{base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 3324, in _iter_
> [2019-12-09 02:23:40,859] \{base_task_runner.py:98} INFO - Subtask: return self._execute_and_instances(context)
> [2019-12-09 02:23:40,860] \{base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 3346, in _execute_and_instances
> [2019-12-09 02:23:40,865] \{base_task_runner.py:98} INFO - Subtask: querycontext, self._connection_from_session, close_with_result=True
> [2019-12-09 02:23:40,866] \{base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 3361, in _get_bind_args
> [2019-12-09 02:23:40,870] \{base_task_runner.py:98} INFO - Subtask: mapper=self._bind_mapper(), clause=querycontext.statement, **kw
> [2019-12-09 02:23:40,870] \{base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 3339, in _connection_from_session
> [2019-12-09 02:23:40,871] \{base_task_runner.py:98} INFO - Subtask: conn = self.session.connection(**kw)
> [2019-12-09 02:23:40,872] \{base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1124, in connection
> [2019-12-09 02:23:40,876] \{base_task_runner.py:98} INFO - Subtask: execution_options=execution_options,
> [2019-12-09 02:23:40,877] \{base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1130, in _connection_for_bind
> [2019-12-09 02:23:40,881] \{base_task_runner.py:98} INFO - Subtask: engine, execution_options
> [2019-12-09 02:23:40,882] \{base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 408, in _connection_for_bind
> [2019-12-09 02:23:40,883] \{base_task_runner.py:98} INFO - Subtask: self._assert_active()
> [2019-12-09 02:23:40,884] \{base_task_runner.py:98} INFO - Subtask: File "/opt/conda/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 295, in _assert_active
> [2019-12-09 02:23:40,889] \{base_task_runner.py:98} INFO - Subtask: code="7s2a",
> [2019-12-09 02:23:40,907] \{base_task_runner.py:98} INFO - Subtask: sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
> [2019-12-09 02:23:40,911] \{base_task_runner.py:98} INFO - Subtask: [SQL: UPDATE task_instance SET state=%s WHERE task_instance.task_id = %s AND task_instance.dag_id = %s AND task_instance.execution_date = %s]
> [2019-12-09 02:23:40,916] \{base_task_runner.py:98} INFO - Subtask: [parameters: (u'queued', 'before_gs_delete', 'prod_Turner-Cartoon_daily_etl_v1.1.9.4.0.prod_Turner-Cartoon_daily_etl_v1.1.9.4.0_ingest_bigquery_subdag', datetime.datetime(2019, 12, 7, 5, 30))]
> [2019-12-09 02:23:40,933] \{base_task_runner.py:98} INFO - Subtask: (Background on this error at: http://sqlalche.me/e/e3q8) (Background on this error at: http://sqlalche.me/e/7s2a)
>  
> I have no idea to slove this and my configuration like below:
> [core]
> dags_folder = /usr/local/airflow/dags
> base_log_folder = /usr/local/airflow/logs
> remote_log_conn_id =
> encrypt_s3_logs = False
> logging_level = INFO
> logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
> logging_config_class =
> log_format = [%%(asctime)s] \{%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
> simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
> executor = CeleryExecutor
> sql_alchemy_conn = mysql://airflow:airflow@xxxx:3306/airflow
> sql_alchemy_pool_size = 5
> sql_alchemy_pool_recycle = 3600
> parallelism = 64
> dag_concurrency = 16
> Are DAGs paused by default at creation
> dags_are_paused_at_creation = True
> non_pooled_task_slot_count = 128
> max_active_runs_per_dag = 16
> load_examples = False
> plugins_folder = /usr/local/airflow/plugins
> fernet_key = 46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
> donot_pickle = False
> dagbag_import_timeout = 30
> task_runner = BashTaskRunner
> default_impersonation =
> security =
> unit_test_mode = False
> task_log_reader = file.task
> enable_xcom_pickling = True
> killed_task_cleanup_time = 60
> [cli]
> api_client = airflow.api.client.local_client
> endpoint_url = http://localhost:8080
> [api]
> auth_backend = airflow.api.auth.backend.default
> [operators]
> default_owner = Airflow
> default_cpus = 1
> default_ram = 512
> default_disk = 512
> default_gpus = 0
> [webserver]
> base_url = http://localhost:8080
> web_server_host = 0.0.0.0
> web_server_ssl_cert =
> web_server_ssl_key =
> web_server_worker_timeout = 120
> worker_refresh_batch_size = 1
> worker_refresh_interval = 30
> secret_key = temporary_key
> workers = 4
> worker_class = sync
> access_logfile = -
> error_logfile = -
> expose_config = True
> authenticate = False
> filter_by_owner = False
> owner_mode = user
> dag_default_view = tree
> dag_orientation = LR
> demo_mode = False
> log_fetch_timeout_sec = 5
> hide_paused_dags_by_default = True
> page_size = 100
> [email]
> email_backend = airflow.utils.email.send_email_smtp
> [smtp]
> smtp_host = xxxx
> smtp_starttls = True
> smtp_ssl = False
> smtp_user = xxxx
> smtp_password=xxxxx
> smtp_port = 25
> smtp_mail_from = xxxx
> [celery]
> celeryd_concurrency = 16
> broker_url = redis://xxxx:6379/1
> flower_host = 0.0.0.0
> flower_port = 5555
> default_queue = default
> celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
> [dask]
> cluster_address = 127.0.0.1:8786
> [scheduler]
> job_heartbeat_sec = 5
> scheduler_heartbeat_sec = 5
> run_duration = -1
> min_file_process_interval = 0
> dag_dir_list_interval = 300
> print_stats_interval = 30
> child_process_log_directory = /usr/local/airflow/logs/scheduler
> scheduler_zombie_task_threshold = 300
> catchup_by_default = True
> max_tis_per_query = 0
> statsd_on = False
> statsd_host = localhost
> statsd_port = 8125
> statsd_prefix = airflow
> max_threads = 16
> authenticate = False
> [ldap]
> set this to ldaps://<your.ldap.server>:<port>
> uri =
> user_filter = objectClass=*
> user_name_attr = uid
> group_member_attr = memberOf
> superuser_filter =
> data_profiler_filter =
> bind_user = cn=Manager,dc=example,dc=com
> bind_password = insecure
> basedn = dc=example,dc=com
> cacert = /etc/ca/ldap_ca.crt
> search_scope = LEVEL
> [mesos]
> master = localhost:5050
> framework_name = Airflow
> task_cpu = 1
> task_memory = 256
> checkpoint = False
> failover_timeout = 604800
> authenticate = False
> default_principal = admin
> default_secret = admin
> [kerberos]
> ccache = /tmp/airflow_krb5_ccache
> principal = airflow
> reinit_frequency = 3600
> kinit_path = kinit
> keytab = airflow.keytab
> [github_enterprise]
> api_rev = v3
> [admin]
> hide_sensitive_variable_fields = False



--
This message was sent by Atlassian Jira
(v8.3.4#803005)