You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Maxime Beauchemin <ma...@gmail.com> on 2016/07/11 21:02:29 UTC
Re: pickle exhausted error when pulling from xcom
Hi,
The blob type in MySql is not very large, from my (also insufficient)
memory, it's 64kb. You probably want to alter the `pickle` field in your DB
to a MEDIUMBLOB or LONGBLOB.
Max
On Mon, Jul 11, 2016 at 2:24 PM, Clark, Louis <LC...@intrexon.com> wrote:
> Hello,
> Since we switched our Airflow system to using MySQL as a model store, I
> have been getting errors like: _pickle.UnpicklingError: pickle exhausted
> before end of frame. Trace is below. It occurs after an xcom_pull() and
> the trace goes through sqlalchemy. A deeper dive into the DB suggests that
> the pickle is being stored in the dag_pickle.pickle column as a blob, which
> has a max size of 65,535 bytes.
> airflow> desc dag_pickle;
> +--------------+------------+------+-----+---------+----------------+
> | Field | Type | Null | Key | Default | Extra |
> +--------------+------------+------+-----+---------+----------------+
> | id | int(11) | NO | PRI | NULL | auto_increment |
> | pickle | blob | YES | | NULL | |
> | created_dttm | datetime | YES | | NULL | |
> | pickle_hash | bigint(20) | YES | | NULL | |
> +--------------+------------+------+-----+---------+----------------+
>
> Any ideas? I wonder if there is an easy way to switch to a mediumblob.
> I'm looking at models.py now.
>
> thanks,
>
> -Louis
>
> [2016-07-11 10:03:48,851] {models.py:1219} INFO - Executing
> <Task(PythonOperator): pivot> on 2016-07-11 09:23:00
> [2016-07-11 10:03:48,889] {models.py:1286} ERROR - pickle exhausted before
> end of frame
> Traceback (most recent call last):
> File "/home/myuser/src/airflow/models.py", line 1245, in run
> result = task_copy.execute(context=context)
> File "/home/myuser/src/airflow/operators/python_operator.py", line 66,
> in execute
> return_value = self.python_callable(*self.op_args, **self.op_kwargs)
> File "/home/myuser/src/airflow/dags/runsetMonitor.py", line 425, in
> updatePivotTables
> tups = ti.xcom_pull(key='run successes', task_ids=runTaskID)
> File "/home/myuser/src/airflow/models.py", line 1514, in xcom_pull
> return pull_fn(task_id=task_ids)
> File "/home/myuser/src/airflow/utils/db.py", line 53, in wrapper
> result = func(*args, **kwargs)
> File "/home/myuser/src/airflow/models.py", line 3240, in get_one
> result = query.first()
> File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/query.py",
> line 2659, in first
> ret = list(self[0:1])
> File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/query.py",
> line 2457, in __getitem__
> return list(res)
> File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",
> line 86, in instances
> util.raise_from_cause(err)
> File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/util/compat.py",
> line 202, in raise_from_cause
> reraise(type(exception), exception, tb=exc_tb, cause=cause)
> File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/util/compat.py",
> line 186, in reraise
> raise value
> File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",
> line 74, in instances
> for row in fetch]
> File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",
> line 74, in <listcomp>
> for row in fetch]
> File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/orm/loading.py",
> line 73, in <listcomp>
> rows = [keyed_tuple([proc(row) for proc in process])
> File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchemy/sql/sqltypes.py",
> line 1253, in process
> return loads(value)
> File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dill.py",
> line 260, in loads
> return load(file)
> File
> "/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dill.py",
> line 250, in load
> obj = pik.load()
> File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", line
> 1039, in load
> dispatch[key[0]](self)
> File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", line
> 1197, in load_binunicode
> self.append(str(self.read(len), 'utf-8', 'surrogatepass'))
> File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py", line
> 234, in read
> "pickle exhausted before end of frame")
> _pickle.UnpicklingError: pickle exhausted before end of frame
> [2016-07-11 10:03:48,896] {models.py:1306} INFO - Marking task as FAILED.
> [2016-07-11 10:03:48,920] {email.py:96} INFO - Sent an alert email to
> [redacted]
> [2016-07-11 10:03:48,974] {models.py:1327} ERROR - pickle exhausted before
> end of frame
>
>
>
> ________________________________
>
> CONFIDENTIAL TRANSMISSION - This message, including any attachments, is
> confidential and may be privileged. If you are not the intended recipient,
> please delete it without further distribution and reply to the sender that
> you have received the message in error.
>
Re: pickle exhausted error when pulling from xcom
Posted by "Clark, Louis" <LC...@intrexon.com>.
verified hack solution for this problem:
alter table xcom modify value MEDIUMBLOB;
‹> that alleviates the Œpickle exhausted¹ error for xcom.pull()
a longer term solution might involve using PickleType(length=...) to force
SqlAlchemy/MySQL to use something bigger around line 3162 in models.py.
See:
http://docs.sqlalchemy.org/en/latest/core/type_basics.html#sqlalchemy.types
.LargeBinary
On 7/11/16, 2:06 PM, "Clark, Louis" <LC...@intrexon.com> wrote:
>thanks. Yeah, looks like the only way to do it is to alter the table
>directly. Ugly. The use of a blob vs something else appears to be
>embedded in SqlAlchemy in PickleType. Celery has had a similar problem:
><https://github.com/celery/celery/issues/461>.
>
>thanks,
>
>-Louis
>
>
>
>
>On 7/11/16, 2:02 PM, "Maxime Beauchemin" <ma...@gmail.com>
>wrote:
>
>>Hi,
>>
>>The blob type in MySql is not very large, from my (also insufficient)
>>memory, it's 64kb. You probably want to alter the `pickle` field in your
>>DB
>>to a MEDIUMBLOB or LONGBLOB.
>>
>>Max
>>
>>On Mon, Jul 11, 2016 at 2:24 PM, Clark, Louis <LC...@intrexon.com>
>>wrote:
>>
>>> Hello,
>>> Since we switched our Airflow system to using MySQL as a model store,
>>>I
>>> have been getting errors like: _pickle.UnpicklingError: pickle
>>>exhausted
>>> before end of frame. Trace is below. It occurs after an xcom_pull()
>>>and
>>> the trace goes through sqlalchemy. A deeper dive into the DB suggests
>>>that
>>> the pickle is being stored in the dag_pickle.pickle column as a blob,
>>>which
>>> has a max size of 65,535 bytes.
>>> airflow> desc dag_pickle;
>>> +--------------+------------+------+-----+---------+----------------+
>>> | Field | Type | Null | Key | Default | Extra |
>>> +--------------+------------+------+-----+---------+----------------+
>>> | id | int(11) | NO | PRI | NULL | auto_increment |
>>> | pickle | blob | YES | | NULL | |
>>> | created_dttm | datetime | YES | | NULL | |
>>> | pickle_hash | bigint(20) | YES | | NULL | |
>>> +--------------+------------+------+-----+---------+----------------+
>>>
>>> Any ideas? I wonder if there is an easy way to switch to a mediumblob.
>>> I'm looking at models.py now.
>>>
>>> thanks,
>>>
>>> -Louis
>>>
>>> [2016-07-11 10:03:48,851] {models.py:1219} INFO - Executing
>>> <Task(PythonOperator): pivot> on 2016-07-11 09:23:00
>>> [2016-07-11 10:03:48,889] {models.py:1286} ERROR - pickle exhausted
>>>before
>>> end of frame
>>> Traceback (most recent call last):
>>> File "/home/myuser/src/airflow/models.py", line 1245, in run
>>> result = task_copy.execute(context=context)
>>> File "/home/myuser/src/airflow/operators/python_operator.py", line
>>>66,
>>> in execute
>>> return_value = self.python_callable(*self.op_args,
>>>**self.op_kwargs)
>>> File "/home/myuser/src/airflow/dags/runsetMonitor.py", line 425, in
>>> updatePivotTables
>>> tups = ti.xcom_pull(key='run successes', task_ids=runTaskID)
>>> File "/home/myuser/src/airflow/models.py", line 1514, in xcom_pull
>>> return pull_fn(task_id=task_ids)
>>> File "/home/myuser/src/airflow/utils/db.py", line 53, in wrapper
>>> result = func(*args, **kwargs)
>>> File "/home/myuser/src/airflow/models.py", line 3240, in get_one
>>> result = query.first()
>>> File
>>>
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/orm/query.py",
>>> line 2659, in first
>>> ret = list(self[0:1])
>>> File
>>>
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/orm/query.py",
>>> line 2457, in __getitem__
>>> return list(res)
>>> File
>>>
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/orm/loading.py",
>>> line 86, in instances
>>> util.raise_from_cause(err)
>>> File
>>>
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/util/compat.py",
>>> line 202, in raise_from_cause
>>> reraise(type(exception), exception, tb=exc_tb, cause=cause)
>>> File
>>>
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/util/compat.py",
>>> line 186, in reraise
>>> raise value
>>> File
>>>
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/orm/loading.py",
>>> line 74, in instances
>>> for row in fetch]
>>> File
>>>
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/orm/loading.py",
>>> line 74, in <listcomp>
>>> for row in fetch]
>>> File
>>>
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/orm/loading.py",
>>> line 73, in <listcomp>
>>> rows = [keyed_tuple([proc(row) for proc in process])
>>> File
>>>
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalche
>>>m
>>>y/sql/sqltypes.py",
>>> line 1253, in process
>>> return loads(value)
>>> File
>>>
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dil
>>>l
>>>.py",
>>> line 260, in loads
>>> return load(file)
>>> File
>>>
>>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dil
>>>l
>>>.py",
>>> line 250, in load
>>> obj = pik.load()
>>> File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py",
>>>line
>>> 1039, in load
>>> dispatch[key[0]](self)
>>> File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py",
>>>line
>>> 1197, in load_binunicode
>>> self.append(str(self.read(len), 'utf-8', 'surrogatepass'))
>>> File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py",
>>>line
>>> 234, in read
>>> "pickle exhausted before end of frame")
>>> _pickle.UnpicklingError: pickle exhausted before end of frame
>>> [2016-07-11 10:03:48,896] {models.py:1306} INFO - Marking task as
>>>FAILED.
>>> [2016-07-11 10:03:48,920] {email.py:96} INFO - Sent an alert email to
>>> [redacted]
>>> [2016-07-11 10:03:48,974] {models.py:1327} ERROR - pickle exhausted
>>>before
>>> end of frame
>>>
>>>
>>>
>>> ________________________________
>>>
>>> CONFIDENTIAL TRANSMISSION - This message, including any attachments, is
>>> confidential and may be privileged. If you are not the intended
>>>recipient,
>>> please delete it without further distribution and reply to the sender
>>>that
>>> you have received the message in error.
>>>
>
Re: pickle exhausted error when pulling from xcom
Posted by "Clark, Louis" <LC...@intrexon.com>.
thanks. Yeah, looks like the only way to do it is to alter the table
directly. Ugly. The use of a blob vs something else appears to be
embedded in SqlAlchemy in PickleType. Celery has had a similar problem:
<https://github.com/celery/celery/issues/461>.
thanks,
-Louis
On 7/11/16, 2:02 PM, "Maxime Beauchemin" <ma...@gmail.com>
wrote:
>Hi,
>
>The blob type in MySql is not very large, from my (also insufficient)
>memory, it's 64kb. You probably want to alter the `pickle` field in your
>DB
>to a MEDIUMBLOB or LONGBLOB.
>
>Max
>
>On Mon, Jul 11, 2016 at 2:24 PM, Clark, Louis <LC...@intrexon.com> wrote:
>
>> Hello,
>> Since we switched our Airflow system to using MySQL as a model store,
>>I
>> have been getting errors like: _pickle.UnpicklingError: pickle exhausted
>> before end of frame. Trace is below. It occurs after an xcom_pull()
>>and
>> the trace goes through sqlalchemy. A deeper dive into the DB suggests
>>that
>> the pickle is being stored in the dag_pickle.pickle column as a blob,
>>which
>> has a max size of 65,535 bytes.
>> airflow> desc dag_pickle;
>> +--------------+------------+------+-----+---------+----------------+
>> | Field | Type | Null | Key | Default | Extra |
>> +--------------+------------+------+-----+---------+----------------+
>> | id | int(11) | NO | PRI | NULL | auto_increment |
>> | pickle | blob | YES | | NULL | |
>> | created_dttm | datetime | YES | | NULL | |
>> | pickle_hash | bigint(20) | YES | | NULL | |
>> +--------------+------------+------+-----+---------+----------------+
>>
>> Any ideas? I wonder if there is an easy way to switch to a mediumblob.
>> I'm looking at models.py now.
>>
>> thanks,
>>
>> -Louis
>>
>> [2016-07-11 10:03:48,851] {models.py:1219} INFO - Executing
>> <Task(PythonOperator): pivot> on 2016-07-11 09:23:00
>> [2016-07-11 10:03:48,889] {models.py:1286} ERROR - pickle exhausted
>>before
>> end of frame
>> Traceback (most recent call last):
>> File "/home/myuser/src/airflow/models.py", line 1245, in run
>> result = task_copy.execute(context=context)
>> File "/home/myuser/src/airflow/operators/python_operator.py", line 66,
>> in execute
>> return_value = self.python_callable(*self.op_args, **self.op_kwargs)
>> File "/home/myuser/src/airflow/dags/runsetMonitor.py", line 425, in
>> updatePivotTables
>> tups = ti.xcom_pull(key='run successes', task_ids=runTaskID)
>> File "/home/myuser/src/airflow/models.py", line 1514, in xcom_pull
>> return pull_fn(task_id=task_ids)
>> File "/home/myuser/src/airflow/utils/db.py", line 53, in wrapper
>> result = func(*args, **kwargs)
>> File "/home/myuser/src/airflow/models.py", line 3240, in get_one
>> result = query.first()
>> File
>>
>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem
>>y/orm/query.py",
>> line 2659, in first
>> ret = list(self[0:1])
>> File
>>
>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem
>>y/orm/query.py",
>> line 2457, in __getitem__
>> return list(res)
>> File
>>
>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem
>>y/orm/loading.py",
>> line 86, in instances
>> util.raise_from_cause(err)
>> File
>>
>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem
>>y/util/compat.py",
>> line 202, in raise_from_cause
>> reraise(type(exception), exception, tb=exc_tb, cause=cause)
>> File
>>
>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem
>>y/util/compat.py",
>> line 186, in reraise
>> raise value
>> File
>>
>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem
>>y/orm/loading.py",
>> line 74, in instances
>> for row in fetch]
>> File
>>
>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem
>>y/orm/loading.py",
>> line 74, in <listcomp>
>> for row in fetch]
>> File
>>
>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem
>>y/orm/loading.py",
>> line 73, in <listcomp>
>> rows = [keyed_tuple([proc(row) for proc in process])
>> File
>>
>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/sqlalchem
>>y/sql/sqltypes.py",
>> line 1253, in process
>> return loads(value)
>> File
>>
>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dill
>>.py",
>> line 260, in loads
>> return load(file)
>> File
>>
>>"/home/myuser/anaconda3/envs/acda35/lib/python3.5/site-packages/dill/dill
>>.py",
>> line 250, in load
>> obj = pik.load()
>> File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py",
>>line
>> 1039, in load
>> dispatch[key[0]](self)
>> File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py",
>>line
>> 1197, in load_binunicode
>> self.append(str(self.read(len), 'utf-8', 'surrogatepass'))
>> File "/home/myuser/anaconda3/envs/acda35/lib/python3.5/pickle.py",
>>line
>> 234, in read
>> "pickle exhausted before end of frame")
>> _pickle.UnpicklingError: pickle exhausted before end of frame
>> [2016-07-11 10:03:48,896] {models.py:1306} INFO - Marking task as
>>FAILED.
>> [2016-07-11 10:03:48,920] {email.py:96} INFO - Sent an alert email to
>> [redacted]
>> [2016-07-11 10:03:48,974] {models.py:1327} ERROR - pickle exhausted
>>before
>> end of frame
>>
>>
>>
>> ________________________________
>>
>> CONFIDENTIAL TRANSMISSION - This message, including any attachments, is
>> confidential and may be privileged. If you are not the intended
>>recipient,
>> please delete it without further distribution and reply to the sender
>>that
>> you have received the message in error.
>>