You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/09/07 19:48:37 UTC

[GitHub] jlowin closed pull request #2048: [AIRFLOW-828] Add maximum XCom size

jlowin closed pull request #2048: [AIRFLOW-828] Add maximum XCom size
URL: https://github.com/apache/incubator-airflow/pull/2048
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/configuration.py b/airflow/configuration.py
index 6752bdb283..927e6110f8 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -200,6 +200,10 @@ def run_command(command):
 default_disk = 512
 default_gpus = 0
 
+[xcom]
+
+# the maximum size of a pickled XCom object, in bytes
+max_size = 20000
 
 [webserver]
 # The base url of your website as airflow cannot guess what domain or
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 22312083d8..e65d907416 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -22,7 +22,7 @@ class AirflowException(Exception):
 
 class AirflowConfigException(AirflowException):
     pass
-    
+
 
 class AirflowSensorTimeout(AirflowException):
     pass
@@ -34,3 +34,7 @@ class AirflowTaskTimeout(AirflowException):
 
 class AirflowSkipException(AirflowException):
     pass
+
+
+class XComException(AirflowException):
+    pass
diff --git a/airflow/models.py b/airflow/models.py
index 6cf7ad9dee..8911242c1f 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -61,7 +61,8 @@
 from airflow import settings, utils
 from airflow.executors import DEFAULT_EXECUTOR, LocalExecutor
 from airflow import configuration
-from airflow.exceptions import AirflowException, AirflowSkipException, AirflowTaskTimeout
+from airflow.exceptions import (
+    AirflowException, AirflowSkipException, AirflowTaskTimeout, XComException)
 from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
 from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
@@ -3600,6 +3601,15 @@ def set(
         """
         session.expunge_all()
 
+        # check XCom size
+        max_xcom_size = configuration.getint('XCOM', 'MAX_SIZE')
+        xcom_size = sys.getsizeof(pickle.dumps(value))
+        if xcom_size > max_xcom_size:
+            raise XComException(
+                "The XCom's pickled size ({} bytes) is larger than the "
+                "maximum allowed size ({} bytes).".format(
+                    xcom_size, max_xcom_size))
+
         # remove any duplicate XComs
         session.query(cls).filter(
             cls.key == key,
diff --git a/tests/models.py b/tests/models.py
index 346f47cfab..31b8f80b93 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -23,7 +23,7 @@
 import time
 
 from airflow import models, settings, AirflowException
-from airflow.exceptions import AirflowSkipException
+from airflow.exceptions import AirflowSkipException, XComException
 from airflow.models import DAG, TaskInstance as TI
 from airflow.models import State as ST
 from airflow.models import DagModel
@@ -581,6 +581,31 @@ def test_check_task_dependencies(self, trigger_rule, successes, skipped,
         self.assertEqual(completed, expect_completed)
         self.assertEqual(ti.state, expect_state)
 
+
+class XComTest(unittest.TestCase):
+
+    def test_xcom_max_size(self):
+        """
+        Test that pushing large XComs raises an error
+        """
+        small_value = [0] * 100
+        large_value = [0] * 1000000
+
+        dag = models.DAG(dag_id='test_xcom')
+        task = DummyOperator(
+            task_id='test_xcom',
+            dag=dag,
+            owner='airflow',
+            start_date=datetime.datetime(2016, 6, 2, 0, 0, 0))
+        ti = TI(task=task, execution_date=datetime.datetime.now())
+
+        # this should work
+        ti.xcom_push(key='small xcom', value=small_value)
+
+        # this should fail
+        with self.assertRaises(XComException):
+            ti.xcom_push(key='large xcom', value=large_value)
+
     def test_xcom_pull_after_success(self):
         """
         tests xcom set/clear relative to a task in a 'success' rerun scenario


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services