You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Nathaniel Ritholtz (Jira)" <ji...@apache.org> on 2020/02/13 19:29:00 UTC

[jira] [Updated] (AIRFLOW-6795) serialized_dag table's data column text type is too small for mysql

     [ https://issues.apache.org/jira/browse/AIRFLOW-6795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Nathaniel Ritholtz updated AIRFLOW-6795:
----------------------------------------
    Summary: serialized_dag table's data column text type is too small for mysql  (was: serialized_dag table's data type max length is too small)

> serialized_dag table's data column text type is too small for mysql
> -------------------------------------------------------------------
>
>                 Key: AIRFLOW-6795
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6795
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: serialization
>    Affects Versions: 1.10.9
>            Reporter: Nathaniel Ritholtz
>            Priority: Major
>
> When upgrading to v1.10.9, I tried using the new store_serialized_dags flag. However, the scheduler was erroring out with:
> {code}
> scheduler_1  | Process DagFileProcessor2163-Process:
> scheduler_1  | Traceback (most recent call last):
> scheduler_1  |   File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
> scheduler_1  |     self.run()
> scheduler_1  |   File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
> scheduler_1  |     self._target(*self._args, **self._kwargs)
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 157, in _run_file_processor
> scheduler_1  |     pickle_dags)
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
> scheduler_1  |     return func(*args, **kwargs)
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1580, in process_file
> scheduler_1  |     dag.sync_to_db()
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
> scheduler_1  |     return func(*args, **kwargs)
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line 1514, in sync_to_db
> scheduler_1  |     session=session
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in wrapper
> scheduler_1  |     return func(*args, **kwargs)
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/models/serialized_dag.py", line 118, in write_dag
> scheduler_1  |     session.merge(cls(dag))
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2113, in merge
> scheduler_1  |     _resolve_conflict_map=_resolve_conflict_map,
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2186, in _merge
> scheduler_1  |     merged = self.query(mapper.class_).get(key[1])
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 1004, in get
> scheduler_1  |     return self._get_impl(ident, loading.load_on_pk_identity)
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 1116, in _get_impl
> scheduler_1  |     return db_load_fn(self, primary_key_identity)
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 284, in load_on_pk_identity
> scheduler_1  |     return q.one()
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3347, in one
> scheduler_1  |     ret = self.one_or_none()
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3316, in one_or_none
> scheduler_1  |     ret = list(self)
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 101, in instances
> scheduler_1  |     util.raise_from_cause(err)
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
> scheduler_1  |     reraise(type(exception), exception, tb=exc_tb, cause=cause)
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
> scheduler_1  |     raise value
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 81, in instances
> scheduler_1  |     rows = [proc(row) for row in fetch]
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 81, in <listcomp>
> scheduler_1  |     rows = [proc(row) for row in fetch]
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 574, in _instance
> scheduler_1  |     populators,
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 695, in _populate_full
> scheduler_1  |     dict_[key] = getter(row)
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/type_api.py", line 1266, in process
> scheduler_1  |     return process_value(impl_processor(value), dialect)
> scheduler_1  |   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/sqltypes.py", line 2407, in process
> scheduler_1  |     return json_deserializer(value)
> scheduler_1  |   File "/usr/local/lib/python3.6/json/__init__.py", line 354, in loads
> scheduler_1  |     return _default_decoder.decode(s)
> scheduler_1  |   File "/usr/local/lib/python3.6/json/decoder.py", line 339, in decode
> scheduler_1  |     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> scheduler_1  |   File "/usr/local/lib/python3.6/json/decoder.py", line 355, in raw_decode
> scheduler_1  |     obj, end = self.scan_once(s, idx)
> {code}
> After further investigation, I found it was because of my usage of the airflow-db-dag (https://github.com/teamclairvoyant/airflow-maintenance-dags/blob/master/db-cleanup/airflow-db-cleanup.py). In this DAG, the params to the PythonOperators includes a hash that has values of objects such as DagRun. As a result the resulting serialization of the DAG is pretty large. When I looked at the text column for the record in the serialized_dag table for this DAG, I saw that the data was cutoff mid DAG and the character length was at 65535. This is because the data column is type TEXT which has a max character length at 65535. So what I assume was happenning is the process which was storing the DAG serialized data was forced to truncate the DAG somewhere in the middle of the serialization.
> Is it possible to maybe change the TEXT field to MEDIUMTEXT? Locally I made the change on my mysql DB and the DAG was able to be serialized/deserialized successfully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)