You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Rodrigo Martins de Oliveira (JIRA)" <ji...@apache.org> on 2019/07/25 11:20:00 UTC

[jira] [Comment Edited] (AIRFLOW-5039) xcom_push execution_date setting is broken when core.enable_xcom_pickling=False

    [ https://issues.apache.org/jira/browse/AIRFLOW-5039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16892659#comment-16892659 ] 

Rodrigo Martins de Oliveira edited comment on AIRFLOW-5039 at 7/25/19 11:19 AM:
--------------------------------------------------------------------------------

[~jackjack10] [~kamil.bregula] I can take the time to discuss an implementation over Github and submit a PR, I've worked around manually this setting a different default encoder for `json` which can handle datetime serialization. I'll look around the source code to see if my workaround can be generalized as a feature without breaking anything, serializing dates is always delicate and can break a lot of stuff...

For the record, this is the utility function I've been using at the head of my DAGs to workaround this in GCP:

{quote}import datetime
import json


class DateTimeEncoder(json.JSONEncoder):
    """JSON Encoder which supports encoding datetime objects"""

    def default(self, obj):  # type: ignore
        """Support datetime encoding"""
        if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
            return obj.isoformat()
        elif isinstance(obj, datetime.timedelta):
            return (datetime.datetime.min + obj).time().isoformat()


def set_json_default_encoder() -> None:
    """Sets the default JSON encoder to DateTimeEncoder"""
    json._default_encoder = DateTimeEncoder()  # type: ignore{quote}




was (Author: roo.oliv):
[~jackjack10] [~kamil.bregula] I can take the time to discuss an implementation over Github and submit a PR, I've worked around manually this setting a different default encoder for `json` which can handle datetime serialization. I'll look around the source code to see if my workaround can be generalized as a feature without breaking anything, serializing dates is always delicate and can break a lot of stuff...

For the record, this is the utility function I've been using at the head of my DAGs to workaround this in GCP:

{{
import datetime
import json


class DateTimeEncoder(json.JSONEncoder):
    """JSON Encoder which supports encoding datetime objects"""

    def default(self, obj):  # type: ignore
        """Support datetime encoding"""
        if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
            return obj.isoformat()
        elif isinstance(obj, datetime.timedelta):
            return (datetime.datetime.min + obj).time().isoformat()


def set_json_default_encoder() -> None:
    """Sets the default JSON encoder to DateTimeEncoder"""
    json._default_encoder = DateTimeEncoder()  # type: ignore
}}


> xcom_push execution_date setting is broken when core.enable_xcom_pickling=False
> -------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-5039
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-5039
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: xcom
>    Affects Versions: 1.10.2, 1.10.3
>            Reporter: Rodrigo Martins de Oliveira
>            Priority: Critical
>
> Function [xcom_push from airflow.models.taskinstance automatically sets the execution_date to XCOM|https://github.com/apache/airflow/blob/7f66d0a6dc13a790b4d2d31694cb50737983f82e/airflow/models/taskinstance.py#L1336], while this is fine when pickling is enabled it just won't work when it isn't as [XCOM will attempt to JSON serialize a `datetime` object|https://github.com/apache/airflow/blob/7f66d0a6dc13a790b4d2d31694cb50737983f82e/airflow/models/xcom.py#L229], which is unsupported.
>  
> This issue is critical as having pickle enabled poses security risks and [in environments such as Google Cloud Composer pickling is disabled for good|[https://cloud.google.com/composer/docs/concepts/airflow-configurations]].
> Attempting to run DAGs with tasks which pushes content to XCOM when `core-enable_xcom_pickling=False` in `airflow.cfg` will result in the following stacktrace (tested in version 1.10.2-composer):
> {quote}
>  TypeError: Object of type 'datetime' is not JSON serializable
>  [2019-07-24 19:14:53,193] \{models.py:1796} ERROR - Object of type 'datetime' is not JSON serializable
>  Traceback (most recent call last):
>  File "/usr/local/lib/airflow/airflow/models.py", line 1668, in _run_raw_task
>  self.xcom_push(key=XCOM_RETURN_KEY, value=result)
>  File "/usr/local/lib/airflow/airflow/models.py", line 2063, in xcom_push
>  execution_date=execution_date or self.execution_date)
>  File "/usr/local/lib/airflow/airflow/utils/db.py", line 73, in wrapper
>  return func(*args, **kwargs)
>  File "/usr/local/lib/airflow/airflow/models.py", line 4785, in set
>  value = json.dumps(value).encode('UTF-8')
>  File "/opt/python3.6/lib/python3.6/json/__init__.py", line 231, in dumps
>  return _default_encoder.encode(obj)
>  File "/opt/python3.6/lib/python3.6/json/encoder.py", line 199, in encode
>  chunks = self.iterencode(o, _one_shot=True)
>  File "/opt/python3.6/lib/python3.6/json/encoder.py", line 257, in iterencode
>  return _iterencode(o, 0)
>  File "/opt/python3.6/lib/python3.6/json/encoder.py", line 180, in default
>  o.__class__.__name__)
>  TypeError: Object of type 'datetime' is not JSON serializable
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)