You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/08/03 21:07:33 UTC

[airflow] branch v1-10-test updated (ab1d7ec -> 5e3ba14)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from ab1d7ec  Enable pretty output in mypy (#9785)
     new 707f995  Fix check_integration pre-commit test (#9869)
     new 5e3ba14  Allow to define custom XCom class (#8560)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/config_templates/config.yml          |  7 ++++
 airflow/config_templates/default_airflow.cfg |  4 +++
 airflow/models/xcom.py                       | 34 ++++++++++++++++++-
 docs/concepts.rst                            |  9 +++++
 tests/models/test_xcom.py                    | 50 ++++++++++++++++++++++++++++
 5 files changed, 103 insertions(+), 1 deletion(-)
 create mode 100644 tests/models/test_xcom.py


[airflow] 02/02: Allow to define custom XCom class (#8560)

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5e3ba14abfa8d4308bd3c37f2bca2e97fdf04515
Author: Tomek Urbaszek <tu...@gmail.com>
AuthorDate: Tue Apr 28 16:55:05 2020 +0200

    Allow to define custom XCom class (#8560)
    
    * Allow to define custom XCom class
    
    closes: #8059
    (cherry picked from commit 6c6d6611d2aa112a947a9ebc7200446f51d0ac4c)
---
 airflow/config_templates/config.yml          |  7 ++++
 airflow/config_templates/default_airflow.cfg |  4 +++
 airflow/models/xcom.py                       | 34 ++++++++++++++++++-
 docs/concepts.rst                            |  9 +++++
 tests/models/test_xcom.py                    | 50 ++++++++++++++++++++++++++++
 5 files changed, 103 insertions(+), 1 deletion(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index d1c2c90..f54255e 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -476,6 +476,13 @@
       type: string
       example: ~
       default: "True"
+    - name: xcom_backend
+      description: |
+        Path to custom XCom class that will be used to store and resolve operators results
+      version_added: 1.10.12
+      type: string
+      example: "path.to.CustomXCom"
+      default: "airflow.models.xcom.BaseXCom"
 
 - name: secrets
   description: ~
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index bf83b34..e18e538 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -252,6 +252,10 @@ max_num_rendered_ti_fields_per_task = 30
 # On each dagrun check against defined SLAs
 check_slas = True
 
+# Path to custom XCom class that will be used to store and resolve operators results
+# Example: xcom_backend = path.to.CustomXCom
+xcom_backend = airflow.models.xcom.BaseXCom
+
 [secrets]
 # Full class name of secrets backend to enable (will precede env vars and metastore in search path)
 # Example: backend = airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index f4522b5..0b6a81d 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -40,7 +40,7 @@ MAX_XCOM_SIZE = 49344
 XCOM_RETURN_KEY = 'return_value'
 
 
-class XCom(Base, LoggingMixin):
+class BaseXCom(Base, LoggingMixin):
     """
     Base class for XCom objects.
     """
@@ -232,3 +232,35 @@ class XCom(Base, LoggingMixin):
                       "for XCOM, then you need to enable pickle "
                       "support for XCOM in your airflow config.")
             raise
+
+    @staticmethod
+    def deserialize_value(result) -> Any:
+        # TODO: "pickling" has been deprecated and JSON is preferred.
+        # "pickling" will be removed in Airflow 2.0.
+        enable_pickling = conf.getboolean('core', 'enable_xcom_pickling')
+        if enable_pickling:
+            return pickle.loads(result.value)
+
+        try:
+            return json.loads(result.value.decode('UTF-8'))
+        except ValueError:
+            log.error("Could not deserialize the XCOM value from JSON. "
+                      "If you are using pickles instead of JSON "
+                      "for XCOM, then you need to enable pickle "
+                      "support for XCOM in your airflow config.")
+            raise
+
+
+def resolve_xcom_backend():
+    """Resolves custom XCom class"""
+    clazz = conf.getimport("core", "xcom_backend", fallback=f"airflow.models.xcom.{BaseXCom.__name__}")
+    if clazz:
+        if not issubclass(clazz, BaseXCom):
+            raise TypeError(
+                f"Your custom XCom class `{clazz.__name__}` is not a subclass of `{BaseXCom.__name__}`."
+            )
+        return clazz
+    return BaseXCom
+
+
+XCom = resolve_xcom_backend()
diff --git a/docs/concepts.rst b/docs/concepts.rst
index e85c5b3..dd48003 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -660,6 +660,15 @@ of what this may look like:
 Note that XComs are similar to `Variables`_, but are specifically designed
 for inter-task communication rather than global settings.
 
+Custom XCom backend
+'''''''''''''''''''
+
+It is possible to change ``XCom`` behaviour os serialization and deserialization of tasks' result.
+To do this one have to change ``xcom_backend`` parameter in Airflow config. Provided value should point
+to a class that is subclass of :class:`~airflow.models.xcom.BaseXCom`. To alter the serialaization /
+deserialization mechanism the custom class should override ``serialize_value`` and ``deserialize_value``
+methods.
+
 .. _concepts:variables:
 
 Variables
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
new file mode 100644
index 0000000..206b074
--- /dev/null
+++ b/tests/models/test_xcom.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from airflow.configuration import conf
+from airflow.models.xcom import BaseXCom, resolve_xcom_backend
+from tests.test_utils.config import conf_vars
+
+
+class CustomXCom(BaseXCom):
+    @staticmethod
+    def serialize_value(_):
+        return "custom_value"
+
+
+class TestXCom:
+    @conf_vars({("core", "xcom_backend"): "tests.models.test_xcom.CustomXCom"})
+    def test_resolve_xcom_class(self):
+        cls = resolve_xcom_backend()
+        assert issubclass(cls, CustomXCom)
+        assert cls().serialize_value(None) == "custom_value"
+
+    @conf_vars(
+        {("core", "xcom_backend"): "", ("core", "enable_xcom_pickling"): "False"}
+    )
+    def test_resolve_xcom_class_fallback_to_basexcom(self):
+        cls = resolve_xcom_backend()
+        assert issubclass(cls, BaseXCom)
+        assert cls().serialize_value([1]) == b"[1]"
+
+    @conf_vars({("core", "enable_xcom_pickling"): "False"})
+    def test_resolve_xcom_class_fallback_to_basexcom_no_config(self):
+        init = conf.get("core", "xcom_backend")
+        conf.remove_option("core", "xcom_backend")
+        cls = resolve_xcom_backend()
+        assert issubclass(cls, BaseXCom)
+        assert cls().serialize_value([1]) == b"[1]"
+        conf.set("core", "xcom_backend", init)


[airflow] 01/02: Fix check_integration pre-commit test (#9869)

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 707f9955ad29f0d5088312857b051d17254b2c5f
Author: Alexander Sutcliffe <41...@users.noreply.github.com>
AuthorDate: Fri Jul 17 16:16:53 2020 +0200

    Fix check_integration pre-commit test (#9869)
    
    (cherry picked from commit cbfff65ec56674fea524f6dd7b9aeea495e66d12)