You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/25 13:36:46 UTC

[GitHub] [airflow] ashb opened a new pull request #19825: Deprecate passing execution_date to XCom methods

ashb opened a new pull request #19825:
URL: https://github.com/apache/airflow/pull/19825


   As part of AIP-39 (released in 2.2) we added `run_id` parameters to XCom
   methods, and this changes it so that passing by run_id is the
   recommended approach.
   
   A future PR will change the columns on the xcom table to store run_id
   (instead/as well as exeuction_date) but that will be for 2.3, where as
   this change can be backported to 2.2.x
   
   Discussion thread: https://lists.apache.org/thread/gofj3g6m6vvksy6n0cmgq1qxd309bbbl


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19825: Deprecate passing execution_date to XCom methods

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19825:
URL: https://github.com/apache/airflow/pull/19825#discussion_r756899936



##########
File path: tests/models/test_xcom.py
##########
@@ -217,24 +216,24 @@ def test_xcom_init_on_load_uses_orm_deserialize_value(self, mock_orm_deserialize
         mock_orm_deserialize.assert_called_once_with()
 
     @conf_vars({("core", "xcom_backend"): "tests.models.test_xcom.CustomXCom"})
-    def test_get_one_doesnt_use_orm_deserialize_value(self, session):
+    def test_get_one_doesnt_use_orm_deserialize_value(self, dag_run, session):
         """Test that XCom.get_one does not call orm_deserialize_value"""
         json_obj = {"key": "value"}
-        execution_date = timezone.utcnow()
-        key = XCOM_RETURN_KEY
-        dag_id = "test_dag"
         task_id = "test_task"
 
         XCom = resolve_xcom_backend()
         XCom.set(
-            key=key,
+            key=XCOM_RETURN_KEY,
             value=json_obj,
-            dag_id=dag_id,
+            dag_id=dag_run.dag_id,
             task_id=task_id,
-            execution_date=execution_date,
+            run_id=dag_run.run_id,
             session=session,
         )
 
-        value = XCom.get_one(dag_id=dag_id, task_id=task_id, execution_date=execution_date, session=session)
+        value = XCom.get_one(dag_id=dag_run.dag_id, task_id=task_id, run_id=dag_run.run_id, session=session)
 
         assert value == json_obj
+
+
+# TODO: Add a test of `include_prior_dates` at all! And when given a run_id or a execution_date

Review comment:
       Not finished yet :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #19825: Deprecate passing execution_date to XCom methods

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #19825:
URL: https://github.com/apache/airflow/pull/19825#discussion_r760504129



##########
File path: airflow/models/xcom.py
##########
@@ -79,37 +81,112 @@ def init_on_load(self):
     def __repr__(self):
         return f'<XCom "{self.key}" ({self.task_id} @ {self.execution_date})>'
 
+    @overload

Review comment:
       NICE. TIL https://docs.python.org/3/library/typing.html#typing.overload!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #19825: Deprecate passing execution_date to XCom methods

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #19825:
URL: https://github.com/apache/airflow/pull/19825#issuecomment-988007134


   Oh everything is 🟢 now!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19825: Deprecate passing execution_date to XCom methods

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19825:
URL: https://github.com/apache/airflow/pull/19825#discussion_r760144162



##########
File path: airflow/models/taskinstance.py
##########
@@ -2237,27 +2237,40 @@ def xcom_push(
         :param value: A value for the XCom. The value is pickled and stored
             in the database.
         :type value: any picklable object
-        :param execution_date: if provided, the XCom will not be visible until
-            this date. This can be used, for example, to send a message to a
-            task on a future date without it being immediately visible.
+        :param execution_date: Deprecated parameter.
         :type execution_date: datetime
         :param session: Sqlalchemy ORM Session
         :type session: Session
         """
-        self_execution_date = self.get_dagrun(session).execution_date
-        if execution_date and execution_date < self_execution_date:
-            raise ValueError(
-                f'execution_date can not be in the past (current execution_date is '
-                f'{self_execution_date}; received {execution_date})'
-            )
+        run_id = None
+        if execution_date:
+            self_execution_date = self.get_dagrun(session).execution_date
+            if execution_date < self_execution_date:
+                raise ValueError(
+                    f'execution_date can not be in the past (current execution_date is '
+                    f'{self_execution_date}; received {execution_date})'
+                )
+            elif execution_date != self_execution_date:
+                warnings.warn(
+                    "Passing `execution_date` parameter to xcom_push is deprecated.",
+                    DeprecationWarning,
+                    stacklevel=3,
+                )

Review comment:
       I made this unconditionally warn instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19825: Deprecate passing execution_date to XCom methods

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19825:
URL: https://github.com/apache/airflow/pull/19825#discussion_r756920704



##########
File path: airflow/models/taskinstance.py
##########
@@ -2237,27 +2237,40 @@ def xcom_push(
         :param value: A value for the XCom. The value is pickled and stored
             in the database.
         :type value: any picklable object
-        :param execution_date: if provided, the XCom will not be visible until
-            this date. This can be used, for example, to send a message to a
-            task on a future date without it being immediately visible.
+        :param execution_date: Deprecated parameter.
         :type execution_date: datetime
         :param session: Sqlalchemy ORM Session
         :type session: Session
         """
-        self_execution_date = self.get_dagrun(session).execution_date
-        if execution_date and execution_date < self_execution_date:
-            raise ValueError(
-                f'execution_date can not be in the past (current execution_date is '
-                f'{self_execution_date}; received {execution_date})'
-            )
+        run_id = None
+        if execution_date:
+            self_execution_date = self.get_dagrun(session).execution_date
+            if execution_date < self_execution_date:
+                raise ValueError(
+                    f'execution_date can not be in the past (current execution_date is '
+                    f'{self_execution_date}; received {execution_date})'
+                )
+            elif execution_date != self_execution_date:
+                warnings.warn(
+                    "Passing `execution_date` parameter to xcom_push is deprecated.",
+                    DeprecationWarning,
+                    stacklevel=3,
+                )

Review comment:
       This could possibly be extended to "passing a date at all"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19825: Deprecate passing execution_date to XCom methods

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19825:
URL: https://github.com/apache/airflow/pull/19825#discussion_r760166393



##########
File path: airflow/models/xcom.py
##########
@@ -79,37 +81,112 @@ def init_on_load(self):
     def __repr__(self):
         return f'<XCom "{self.key}" ({self.task_id} @ {self.execution_date})>'
 
+    @overload

Review comment:
       Oh nice approach!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #19825: Deprecate passing execution_date to XCom methods

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #19825:
URL: https://github.com/apache/airflow/pull/19825#issuecomment-985268177


   I’ve made all tests work and migrated most existing XCom usages to use `run_id`. One (very big) exception is operator link; since `get_link` is publicly subclassable, we can’t just change the argument. The current plan (after discussing with @ashb) is to add an optional flag on the class to toggle the behaviour, but there are much detail (and likely bikeshedding) around that, so I’m going to do it in a separate PR. The depracation warning implemented in this PR on passing `execution_date` to XCom is changed from DeprecationWarning to PendingDeprecationWarning to reflect that—we won’t commit the deprecation until we provide a migration path for operator links (which should still happen in 2.3, but let’s not jump the gun).
   
   I also want to extract typing changes I made here into a separate PR, and merge that first before I mark this PR as ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19825: Deprecate passing execution_date to XCom methods

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19825:
URL: https://github.com/apache/airflow/pull/19825#discussion_r761012500



##########
File path: tests/models/test_xcom.py
##########
@@ -212,29 +146,231 @@ def test_xcom_init_on_load_uses_orm_deserialize_value(self, mock_orm_deserialize
             task_id="task_id",
             dag_id="dag_id",
         )
-
         instance.init_on_load()
         mock_orm_deserialize.assert_called_once_with()
 
     @conf_vars({("core", "xcom_backend"): "tests.models.test_xcom.CustomXCom"})
-    def test_get_one_doesnt_use_orm_deserialize_value(self, session):
+    def test_get_one_custom_backend_no_use_orm_deserialize_value(self, dag_run, session):
         """Test that XCom.get_one does not call orm_deserialize_value"""
-        json_obj = {"key": "value"}
-        execution_date = timezone.utcnow()
-        key = XCOM_RETURN_KEY
-        dag_id = "test_dag"
-        task_id = "test_task"
-
         XCom = resolve_xcom_backend()
         XCom.set(
-            key=key,
-            value=json_obj,
-            dag_id=dag_id,
-            task_id=task_id,
-            execution_date=execution_date,
+            key=XCOM_RETURN_KEY,
+            value={"key": "value"},
+            dag_id=dag_run.dag_id,
+            task_id="test_task",
+            run_id=dag_run.run_id,
+            session=session,
+        )
+
+        value = XCom.get_one(
+            dag_id=dag_run.dag_id,
+            task_id="test_task",
+            run_id=dag_run.run_id,
             session=session,
         )
+        assert value == {"key": "value"}
+        XCom.orm_deserialize_value.assert_not_called()
+
+
+class TestXComGet:
+    @pytest.fixture(
+        autouse=True,
+        params=[
+            pytest.param("true", id="enable_xcom_pickling=true"),
+            pytest.param("false", id="enable_xcom_pickling=false"),
+        ],
+    )
+    def setup_xcom(self, request):
+        with conf_vars({("core", "enable_xcom_pickling"): str(request.param)}):
+            yield
+
+    @pytest.fixture()
+    def push_simple_json_xcom(self, session):
+        def func(*, dag_run: DagRun, task_id: str, key: str, value):
+            return XCom.set(
+                key=key,
+                value=value,
+                dag_id=dag_run.dag_id,
+                task_id=task_id,
+                run_id=dag_run.run_id,
+                session=session,
+            )
+
+        return func
+
+    @pytest.fixture()
+    def setup_for_xcom_get_one(self, dag_run, push_simple_json_xcom):
+        push_simple_json_xcom(dag_run=dag_run, task_id="task_id_1", key="xcom_1", value={"key": "value"})
+
+    @pytest.mark.usefixtures("setup_for_xcom_get_one")
+    def test_xcom_get_one(self, session, dag_run):
+        stored_value = XCom.get_one(
+            key="xcom_1",
+            dag_id=dag_run.dag_id,
+            task_id="task_id_1",
+            run_id=dag_run.run_id,
+            session=session,
+        )
+        assert stored_value == {"key": "value"}
+
+    @pytest.mark.usefixtures("setup_for_xcom_get_one")
+    def test_xcom_get_one_with_execution_date(self, session, dag_run):
+        with pytest.deprecated_call():
+            stored_value = XCom.get_one(
+                key="xcom_1",
+                dag_id=dag_run.dag_id,
+                task_id="task_id_1",
+                execution_date=dag_run.logical_date,
+                session=session,
+            )
+        assert stored_value == {"key": "value"}
+
+    @pytest.fixture()
+    def dag_runs_for_xcom_get_one_from_prior_date(self, dag_run_factory, push_simple_json_xcom):
+        date1 = timezone.datetime(2021, 12, 3, 4, 56)
+        dr1 = dag_run_factory(dag_id="dag", execution_date=date1)
+        dr2 = dag_run_factory(dag_id="dag", execution_date=date1 + datetime.timedelta(days=1))
+
+        # The earlier run pushes an XCom, but not the later run, but the later
+        # run can get this earlier XCom with ``include_prior_dates``.
+        push_simple_json_xcom(dag_run=dr1, task_id="task_1", key="xcom_1", value={"key": "value"})
+
+        return dr1, dr2
+
+    def test_xcom_get_one_from_prior_date(self, session, dag_runs_for_xcom_get_one_from_prior_date):
+        _, dr2 = dag_runs_for_xcom_get_one_from_prior_date
+        retrieved_value = XCom.get_one(
+            run_id=dr2.run_id,
+            key="xcom_1",
+            task_id="task_1",
+            dag_id="dag",
+            include_prior_dates=True,
+            session=session,
+        )
+        assert retrieved_value == {"key": "value"}
+
+    def test_xcom_get_one_from_prior_with_execution_date(
+        self,
+        session,
+        dag_runs_for_xcom_get_one_from_prior_date,
+    ):
+        _, dr2 = dag_runs_for_xcom_get_one_from_prior_date
+        with pytest.deprecated_call():
+            retrieved_value = XCom.get_one(
+                execution_date=dr2.execution_date,
+                key="xcom_1",
+                task_id="task_1",
+                dag_id="dag",
+                include_prior_dates=True,
+                session=session,
+            )
+        assert retrieved_value == {"key": "value"}
+
+    @pytest.fixture()
+    def setup_for_xcom_get_many_single_argument_value(self, dag_run, push_simple_json_xcom):
+        push_simple_json_xcom(dag_run=dag_run, task_id="task_id_1", key="xcom_1", value={"key": "value"})
+
+    @pytest.mark.usefixtures("setup_for_xcom_get_many_single_argument_value")
+    def test_xcom_get_many_single_argument_value(self, session, dag_run):
+        stored_xcoms = XCom.get_many(
+            key="xcom_1",
+            dag_ids=dag_run.dag_id,
+            task_ids="task_id_1",
+            run_id=dag_run.run_id,
+            session=session,
+        ).all()
+        assert len(stored_xcoms) == 1
+        assert stored_xcoms[0].key == "xcom_1"
+        assert stored_xcoms[0].value == {"key": "value"}
+
+    @pytest.mark.usefixtures("setup_for_xcom_get_many_single_argument_value")
+    def test_xcom_get_many_single_argument_value_with_execution_date(self, session, dag_run):
+        with pytest.deprecated_call():
+            stored_xcoms = XCom.get_many(
+                execution_date=dag_run.logical_date,
+                key="xcom_1",
+                dag_ids=dag_run.dag_id,
+                task_ids="task_id_1",
+                session=session,
+            ).all()
+        assert len(stored_xcoms) == 1
+        assert stored_xcoms[0].key == "xcom_1"
+        assert stored_xcoms[0].value == {"key": "value"}
+
+    @pytest.fixture()
+    def setup_for_xcom_get_many_multiple_tasks(self, dag_run, push_simple_json_xcom):
+        push_simple_json_xcom(dag_run=dag_run, key="xcom_1", value={"key1": "value1"}, task_id="task_id_1")
+        push_simple_json_xcom(dag_run=dag_run, key="xcom_1", value={"key2": "value2"}, task_id="task_id_2")
+
+    @pytest.mark.usefixtures("setup_for_xcom_get_many_multiple_tasks")
+    def test_xcom_get_many_multiple_tasks(self, session, dag_run):
+        stored_xcoms = XCom.get_many(
+            key="xcom_1",
+            dag_ids=dag_run.dag_id,
+            task_ids=["task_id_1", "task_id_2"],
+            run_id=dag_run.run_id,
+            session=session,
+        )
+        sorted_values = [x.value for x in sorted(stored_xcoms, key=operator.attrgetter("task_id"))]
+        assert sorted_values == [{"key1": "value1"}, {"key2": "value2"}]
+
+    @pytest.mark.usefixtures("setup_for_xcom_get_many_multiple_tasks")
+    def test_xcom_get_many_multiple_tasks_with_execution_date(self, session, dag_run):
+        with pytest.deprecated_call():
+            stored_xcoms = XCom.get_many(
+                execution_date=dag_run.logical_date,
+                key="xcom_1",
+                dag_ids=dag_run.dag_id,
+                task_ids=["task_id_1", "task_id_2"],
+                session=session,
+            )
+        sorted_values = [x.value for x in sorted(stored_xcoms, key=operator.attrgetter("task_id"))]
+        assert sorted_values == [{"key1": "value1"}, {"key2": "value2"}]
+
+    @pytest.fixture()
+    def dag_runs_for_xcom_get_many_from_prior_dates(self, dag_run_factory, push_simple_json_xcom):
+        date1 = timezone.datetime(2021, 12, 3, 4, 56)
+        date2 = date1 + datetime.timedelta(days=1)
+        dr1 = dag_run_factory(dag_id="dag", execution_date=date1)
+        dr2 = dag_run_factory(dag_id="dag", execution_date=date2)
+        push_simple_json_xcom(dag_run=dr1, task_id="task_1", key="xcom_1", value={"key1": "value1"})
+        push_simple_json_xcom(dag_run=dr2, task_id="task_1", key="xcom_1", value={"key2": "value2"})
+        return dr1, dr2
+
+    def test_xcom_get_many_from_prior_dates(self, session, dag_runs_for_xcom_get_many_from_prior_dates):
+        dr1, dr2 = dag_runs_for_xcom_get_many_from_prior_dates
+        stored_xcoms = XCom.get_many(
+            run_id=dr2.run_id,
+            key="xcom_1",
+            dag_ids="dag",
+            task_ids="task_1",
+            include_prior_dates=True,
+            session=session,
+        )
+
+        # The retrieved XComs should be ordered by logical date, latest first.
+        assert [x.value for x in stored_xcoms] == [{"key2": "value2"}, {"key1": "value1"}]
+        assert [x.execution_date for x in stored_xcoms] == [dr2.logical_date, dr1.logical_date]
+
+    def test_xcom_get_many_from_prior_dates_with_execution_date(
+        self,
+        session,
+        dag_runs_for_xcom_get_many_from_prior_dates,
+    ):
+        dr1, dr2 = dag_runs_for_xcom_get_many_from_prior_dates
+        with pytest.deprecated_call():
+            stored_xcoms = XCom.get_many(
+                execution_date=dr2.execution_date,
+                key="xcom_1",
+                dag_ids="dag",
+                task_ids="task_1",
+                include_prior_dates=True,
+                session=session,
+            )
+
+        # The retrieved XComs should be ordered by logical date, latest first.
+        assert [x.value for x in stored_xcoms] == [{"key2": "value2"}, {"key1": "value1"}]
+        assert [x.execution_date for x in stored_xcoms] == [dr2.logical_date, dr1.logical_date]
 
-        value = XCom.get_one(dag_id=dag_id, task_id=task_id, execution_date=execution_date, session=session)
 
-        assert value == json_obj
+# TODO: Tests for set and clear (both run_id and execution_date).

Review comment:
       Still not finished




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19825: Deprecate passing execution_date to XCom methods

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19825:
URL: https://github.com/apache/airflow/pull/19825#issuecomment-988353130


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #19825: Deprecate passing execution_date to XCom methods

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #19825:
URL: https://github.com/apache/airflow/pull/19825#issuecomment-987787821


   Merging in the Mypy and Sphinx fixes, let’s see this works.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #19825: Deprecate passing execution_date to XCom methods

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #19825:
URL: https://github.com/apache/airflow/pull/19825


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #19825: Deprecate passing execution_date to XCom methods

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #19825:
URL: https://github.com/apache/airflow/pull/19825#issuecomment-990398811


   `UPGRADING.md` entry added. I think we also want to migrate `XCom` to add the `run_id` column in 2.3, and the upgrading note will probably be changed when that is implemented.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org