You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/15 03:32:36 UTC

[airflow] 01/47: Allow to define custom XCom class (#8560)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 05fc4b24482b9127f54925b859de02b9c533b780
Author: Tomek Urbaszek <tu...@gmail.com>
AuthorDate: Tue Apr 28 16:55:05 2020 +0200

    Allow to define custom XCom class (#8560)
    
    * Allow to define custom XCom class
    
    closes: #8059
    (cherry picked from commit 6c6d6611d2aa112a947a9ebc7200446f51d0ac4c)
---
 airflow/config_templates/config.yml          |  7 ++++
 airflow/config_templates/default_airflow.cfg |  4 +++
 airflow/configuration.py                     | 21 ++++++++++++
 airflow/models/xcom.py                       | 19 ++++++++++-
 docs/concepts.rst                            |  9 +++++
 tests/models/test_xcom.py                    | 50 ++++++++++++++++++++++++++++
 6 files changed, 109 insertions(+), 1 deletion(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index d1c2c90..f54255e 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -476,6 +476,13 @@
       type: string
       example: ~
       default: "True"
+    - name: xcom_backend
+      description: |
+        Path to custom XCom class that will be used to store and resolve operators results
+      version_added: 1.10.12
+      type: string
+      example: "path.to.CustomXCom"
+      default: "airflow.models.xcom.BaseXCom"
 
 - name: secrets
   description: ~
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index bf83b34..e18e538 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -252,6 +252,10 @@ max_num_rendered_ti_fields_per_task = 30
 # On each dagrun check against defined SLAs
 check_slas = True
 
+# Path to custom XCom class that will be used to store and resolve operators results
+# Example: xcom_backend = path.to.CustomXCom
+xcom_backend = airflow.models.xcom.BaseXCom
+
 [secrets]
 # Full class name of secrets backend to enable (will precede env vars and metastore in search path)
 # Example: backend = airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend
diff --git a/airflow/configuration.py b/airflow/configuration.py
index d912898..8720d77 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -42,6 +42,7 @@ import yaml
 from zope.deprecation import deprecated
 
 from airflow.exceptions import AirflowConfigException
+from airflow.utils.module_loading import import_string
 
 standard_library.install_aliases()
 
@@ -342,6 +343,26 @@ class AirflowConfigParser(ConfigParser):
                 "section/key [{section}/{key}] not found "
                 "in config".format(section=section, key=key))
 
+    def getimport(self, section, key, **kwargs):
+        """
+        Reads options, imports the full qualified name, and returns the object.
+        In case of failure, it throws an exception a clear message with the key aad the section names
+        :return: The object or None, if the option is empty
+        """
+        full_qualified_path = conf.get(section=section, key=key, **kwargs)
+        if not full_qualified_path:
+            return None
+
+        try:
+            return import_string(full_qualified_path)
+        except ImportError as e:
+            log.error(e)
+            raise AirflowConfigException(
+                'The object could not be loaded. Please check "{key}" key in "{section}" section. '
+                'Current value: "{full_qualified_path}".'.format(
+                    key=key, section=section, full_qualified_path=full_qualified_path)
+            )
+
     def getboolean(self, section, key, **kwargs):
         val = str(self.get(section, key, **kwargs)).lower().strip()
         if '#' in val:
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index f4522b5..ea902be 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -40,7 +40,7 @@ MAX_XCOM_SIZE = 49344
 XCOM_RETURN_KEY = 'return_value'
 
 
-class XCom(Base, LoggingMixin):
+class BaseXCom(Base, LoggingMixin):
     """
     Base class for XCom objects.
     """
@@ -232,3 +232,20 @@ class XCom(Base, LoggingMixin):
                       "for XCOM, then you need to enable pickle "
                       "support for XCOM in your airflow config.")
             raise
+
+
+def resolve_xcom_backend():
+    """Resolves custom XCom class"""
+    clazz = conf.getimport("core", "xcom_backend", fallback="airflow.models.xcom.{}"
+                           .format(BaseXCom.__name__))
+    if clazz:
+        if not issubclass(clazz, BaseXCom):
+            raise TypeError(
+                "Your custom XCom class `{class_name}` is not a subclass of `{base_name}`."
+                .format(class_name=clazz.__name__, base_name=BaseXCom.__name__)
+            )
+        return clazz
+    return BaseXCom
+
+
+XCom = resolve_xcom_backend()
diff --git a/docs/concepts.rst b/docs/concepts.rst
index e85c5b3..dd48003 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -660,6 +660,15 @@ of what this may look like:
 Note that XComs are similar to `Variables`_, but are specifically designed
 for inter-task communication rather than global settings.
 
+Custom XCom backend
+'''''''''''''''''''
+
+It is possible to change ``XCom`` behaviour os serialization and deserialization of tasks' result.
+To do this one have to change ``xcom_backend`` parameter in Airflow config. Provided value should point
+to a class that is subclass of :class:`~airflow.models.xcom.BaseXCom`. To alter the serialaization /
+deserialization mechanism the custom class should override ``serialize_value`` and ``deserialize_value``
+methods.
+
 .. _concepts:variables:
 
 Variables
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
new file mode 100644
index 0000000..206b074
--- /dev/null
+++ b/tests/models/test_xcom.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from airflow.configuration import conf
+from airflow.models.xcom import BaseXCom, resolve_xcom_backend
+from tests.test_utils.config import conf_vars
+
+
+class CustomXCom(BaseXCom):
+    @staticmethod
+    def serialize_value(_):
+        return "custom_value"
+
+
+class TestXCom:
+    @conf_vars({("core", "xcom_backend"): "tests.models.test_xcom.CustomXCom"})
+    def test_resolve_xcom_class(self):
+        cls = resolve_xcom_backend()
+        assert issubclass(cls, CustomXCom)
+        assert cls().serialize_value(None) == "custom_value"
+
+    @conf_vars(
+        {("core", "xcom_backend"): "", ("core", "enable_xcom_pickling"): "False"}
+    )
+    def test_resolve_xcom_class_fallback_to_basexcom(self):
+        cls = resolve_xcom_backend()
+        assert issubclass(cls, BaseXCom)
+        assert cls().serialize_value([1]) == b"[1]"
+
+    @conf_vars({("core", "enable_xcom_pickling"): "False"})
+    def test_resolve_xcom_class_fallback_to_basexcom_no_config(self):
+        init = conf.get("core", "xcom_backend")
+        conf.remove_option("core", "xcom_backend")
+        cls = resolve_xcom_backend()
+        assert issubclass(cls, BaseXCom)
+        assert cls().serialize_value([1]) == b"[1]"
+        conf.set("core", "xcom_backend", init)