You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/24 10:18:34 UTC

[GitHub] [airflow] BasPH commented on a change in pull request #20976: Add max_map_size to limit XCom task mapping size

BasPH commented on a change in pull request #20976:
URL: https://github.com/apache/airflow/pull/20976#discussion_r790592809



##########
File path: tests/models/test_taskinstance.py
##########
@@ -2285,13 +2286,32 @@ def push_something():
             push_something()
 
         ti = dag_maker.create_dagrun().task_instances[0]
-        with pytest.raises(UnmappableXComPushed) as ctx:
+        with pytest.raises(UnmappableXComTypePushed) as ctx:
             self._run_ti_with_faked_mapped_dependants(ti)
 
         assert dag_maker.session.query(TaskMap).count() == 0
         assert ti.state == TaskInstanceState.FAILED
         assert str(ctx.value) == "unmappable return type 'str'"
 
+    @conf_vars({("core", "max_map_length"): "1"})
+    def test_error_if_unmappable_length(self, dag_maker):
+        """If an unmappable return value is used to map, fail the task that pushed the XCom."""
+        with dag_maker(dag_id="test_not_recorded_for_unused") as dag:
+
+            @dag.task()
+            def push_something():
+                return [1, 2]
+
+            push_something()
+
+        ti = dag_maker.create_dagrun().task_instances[0]
+        with pytest.raises(UnmappableXComLengthPushed) as ctx:
+            self._run_ti_with_faked_mapped_dependants(ti)
+
+        assert dag_maker.session.query(TaskMap).count() == 0
+        assert ti.state == TaskInstanceState.FAILED
+        assert str(ctx.value) == "unmappable return value size: 2 > 1"

Review comment:
       ```suggestion
           assert str(ctx.value) == "unmappable return value length: 2 > 1"
   ```

##########
File path: airflow/exceptions.py
##########
@@ -110,6 +110,18 @@ def __str__(self) -> str:
         return f"unmappable return type {type(self.value).__qualname__!r}"
 
 
+class UnmappableXComLengthPushed(AirflowException):
+    """Raise when the pushed value is to large to map as a downstream's dependency."""
+
+    def __init__(self, value: Sized, max_length: int) -> None:
+        super().__init__(value)
+        self.value = value
+        self.max_length = max_length
+
+    def __str__(self) -> str:
+        return f"unmappable return value size: {len(self.value)} > {self.max_length}"

Review comment:
       Would rename size to length for clarity
   
   ```suggestion
           return f"Unmappable return value length: {len(self.value)} > {self.max_length}"
   ```

##########
File path: airflow/exceptions.py
##########
@@ -110,6 +110,18 @@ def __str__(self) -> str:
         return f"unmappable return type {type(self.value).__qualname__!r}"
 
 
+class UnmappableXComLengthPushed(AirflowException):
+    """Raise when the pushed value is to large to map as a downstream's dependency."""

Review comment:
       ```suggestion
       """Raise when the pushed value is too large to map as a downstream's dependency."""
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org