You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/30 07:14:28 UTC

[GitHub] [airflow] blag opened a new pull request, #25419: WIP: Create a pluggable DatasetEventManager

blag opened a new pull request, #25419:
URL: https://github.com/apache/airflow/pull/25419

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   Implement a pluggable dataset event manager. The idea is to have all of the logic that creates dataset events and adds rows to the `DatasetDagRunQueue` table flow through this manager. Plugins can implement cross-deployment broadcasts of dataset events (with some caveats), and handle accepting broadcast dataset events from other deployments, on top of inserting into the `DatasetEvents` and DatasetDagRunQueue` tables.
   
   The `core.dataset_event_manager_class` configuration variable is an import string identifying the class to instantiate.
   
   Comments and feedback are welcome.
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #25419: WIP: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r946218615


##########
airflow/models/dataset.py:
##########
@@ -203,6 +206,44 @@ def __repr__(self):
         return f"{self.__class__.__name__}({', '.join(args)})"
 
 
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(
+        self, *, task_instance=None, dataset=None, session: Session = NEW_SESSION

Review Comment:
   I like `extra` better, since we're closer to the DB here than we are to an HTTP request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r951843886


##########
tests/datasets/test_manager.py:
##########
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+import pytest
+
+from airflow.datasets import Dataset
+from airflow.datasets.manager import DatasetEventManager
+from airflow.models.dataset import DatasetModel
+
+
+@pytest.fixture()
+def mock_task_instance():
+    mock_ti = mock.Mock()
+    mock_ti.task_id = "5"
+    mock_ti.dag_id = "7"
+    mock_ti.run_id = "11"
+    mock_ti.map_index = "13"
+    return mock_ti
+
+
+def create_mock_dag():
+    n = 1
+    while True:
+        mock_dag = mock.Mock()
+        mock_dag.dag_id = n
+        n += 1
+        yield mock_dag
+
+
+class TestDatasetEventManager:
+    def test_register_dataset_change_dataset_doesnt_exist(self, mock_task_instance):
+        dsem = DatasetEventManager()
+
+        dataset = Dataset(uri="dataset_doesnt_exist")
+
+        mock_session = mock.Mock()
+        # Gotta mock up the query results
+        mock_session.query.return_value.filter.return_value.one_or_none.return_value = None
+
+        dsem.register_dataset_change(task_instance=mock_task_instance, dataset=dataset, session=mock_session)
+
+        # Ensure that we have ignored the dataset and _not_ created a DatasetEvent or
+        # DatasetDagRunQueue rows
+        mock_session.add.assert_not_called()
+        mock_session.merge.assert_not_called()
+
+    def test_register_dataset_change_with_dataset(self, mock_task_instance):
+        dsem = DatasetEventManager()
+
+        mock_dag_1 = mock.MagicMock()
+        mock_dag_1.dag_id = 1
+        mock_dag_2 = mock.MagicMock()
+        mock_dag_2.dag_id = 2
+
+        ds = Dataset("test_dataset_uri")
+
+        dsm = DatasetModel(uri=ds.uri)
+        dsm.consuming_dags = [mock_dag_1, mock_dag_2]
+
+        mock_session = mock.Mock()
+        # Gotta mock up the query results
+        mock_session.query.return_value.filter.return_value.one_or_none.return_value = dsm
+
+        dsem.register_dataset_change(task_instance=mock_task_instance, dataset=ds, session=mock_session)
+
+        # Ensure we've created a dataset
+        mock_session.add.assert_called_once()
+        # Ensure that we've created DatasetDagRunQueue rows
+        assert mock_session.merge.call_count == 2
+
+    def test_register_dataset_change_with_datasetmodel(self, mock_task_instance):

Review Comment:
   Since we type hint to only accept `Dataset` and we check for that in the caller, I just removed the test that passed in `DatasetModel`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r951808900


##########
airflow/models/taskinstance.py:
##########
@@ -1525,33 +1528,17 @@ def _run_raw_task(
             session.add(Log(self.state, self))
             session.merge(self)
             if self.state == TaskInstanceState.SUCCESS:
-                self._create_dataset_dag_run_queue_records(session=session)
+                self._register_dataset_changes(session=session)
             session.commit()
 
-    def _create_dataset_dag_run_queue_records(self, *, session: Session) -> None:
-        from airflow.datasets import Dataset
-        from airflow.models.dataset import DatasetModel
-
+    def _register_dataset_changes(self, *, session: Session) -> None:
         for obj in self.task.outlets or []:
             self.log.debug("outlet obj %s", obj)
-            if isinstance(obj, Dataset):
-                dataset = session.query(DatasetModel).filter(DatasetModel.uri == obj.uri).one_or_none()
-                if not dataset:
-                    self.log.warning("Dataset %s not found", obj)
-                    continue
-                consuming_dag_ids = [x.dag_id for x in dataset.consuming_dags]
-                self.log.debug("consuming dag ids %s", consuming_dag_ids)
-                session.add(
-                    DatasetEvent(
-                        dataset_id=dataset.id,
-                        source_task_id=self.task_id,
-                        source_dag_id=self.dag_id,
-                        source_run_id=self.run_id,
-                        source_map_index=self.map_index,
-                    )
-                )
-                for dag_id in consuming_dag_ids:
-                    session.merge(DatasetDagRunQueue(dataset_id=dataset.id, target_dag_id=dag_id))
+            self.dataset_event_manager.register_dataset_change(

Review Comment:
   probably makes sense to make sure you are dealing with a dataset _here_ no?
   
   i.e. to make sure `obj` is a Dataset and not something else?



##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(self, *, task_instance, dataset, extra=None, session: Session) -> None:
+        """
+        For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
+        the dataset event
+        """
+        from airflow.datasets import Dataset
+        from airflow.models.dataset import DatasetEvent, DatasetModel
+
+        if isinstance(dataset, Dataset):

Review Comment:
   not sure we need to check the type here given that the method is `register_dataset_change`



##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(self, *, task_instance, dataset, extra=None, session: Session) -> None:
+        """
+        For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
+        the dataset event
+        """
+        from airflow.datasets import Dataset
+        from airflow.models.dataset import DatasetEvent, DatasetModel
+
+        if isinstance(dataset, Dataset):

Review Comment:
   what else would it be?  do we want pluggability to extend to "anything you can put in outlets"?



##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(self, *, task_instance, dataset, extra=None, session: Session) -> None:

Review Comment:
   what do people think about accepting kwargs here?  perhaps that would help with future compat if we add new features / interaction capability? because if we make changes to the interface, and change the way we call the methods, maybe we'll have to inspect signature or look at interface version or something.....  @uranusjr thoughts?
   
   on one hand adding kwargs feels like it would allow a maybe a little more freedom... at the same time... depending on the changes that we make, it might behoove us to check signature (or version) anyway to know how it will operate / what it expects 🤷 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r951938512


##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(self, *, task_instance, dataset, extra=None, session: Session) -> None:

Review Comment:
   Good call. Forward compatibility with any changes we make in future Airflow versions will help plugin writers not have to do wild things to support multiple versions of Airflow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r949493545


##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(self, *, task_instance, dataset, extra=None, session: Session) -> None:
+        """
+        For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
+        the dataset event
+        """
+        from airflow.datasets import Dataset
+        from airflow.models.dataset import DatasetEvent, DatasetModel
+
+        if isinstance(dataset, Dataset):

Review Comment:
   Well, I didn't want to change the [original behavior](https://github.com/apache/airflow/pull/25419/files#diff-62f7d8a52fefdb8e05d4f040c6d3459b4a56fe46976c24f68843dbaeb5a98487L1537).
   
   I _think_ the only other reasonable thing that somebody would pass in as the `dataset` parameter is a DatasetModel object, in which case we can (presumably) skip pulling it from the database. I've added a test for that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r948787360


##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(self, *, task_instance, dataset, extra=None, session: Session) -> None:
+        """
+        For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
+        the dataset event
+        """
+        from airflow.datasets import Dataset
+        from airflow.models.dataset import DatasetEvent, DatasetModel
+
+        if isinstance(dataset, Dataset):
+            dataset = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
+            if not dataset:
+                self.log.warning("Dataset %s not found", dataset)
+                return
+            session.add(
+                DatasetEvent(
+                    dataset_id=dataset.id,
+                    source_task_id=task_instance.task_id,
+                    source_dag_id=task_instance.dag_id,
+                    source_run_id=task_instance.run_id,
+                    source_map_index=task_instance.map_index,
+                    extra=extra,
+                )
+            )
+            self._queue_dagruns(dataset, session)

Review Comment:
   ```suggestion
           if isinstance(dataset, Dataset):
               dataset = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
               if not dataset:
                   self.log.warning("Dataset %s not found", dataset)
                   return
           session.add(
               DatasetEvent(
                   dataset_id=dataset.id,
                   source_task_id=task_instance.task_id,
                   source_dag_id=task_instance.dag_id,
                   source_run_id=task_instance.run_id,
                   source_map_index=task_instance.map_index,
                   extra=extra,
               )
           )
           self._queue_dagruns(dataset, session)
   ```
   
   I think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r951841498


##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(self, *, task_instance, dataset, extra=None, session: Session) -> None:
+        """
+        For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
+        the dataset event
+        """
+        from airflow.datasets import Dataset
+        from airflow.models.dataset import DatasetEvent, DatasetModel
+
+        if isinstance(dataset, Dataset):

Review Comment:
   Nope, just datasets. Fixed. :+1:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham merged pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
jedcunningham merged PR #25419:
URL: https://github.com/apache/airflow/pull/25419


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #25419: WIP: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r937179650


##########
airflow/models/dataset.py:
##########
@@ -203,6 +206,44 @@ def __repr__(self):
         return f"{self.__class__.__name__}({', '.join(args)})"
 
 
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(
+        self, *, task_instance=None, dataset=None, session: Session = NEW_SESSION

Review Comment:
   should add `extra` to signature here and pass to DatasetEvent
   
   should we consider renaming it `payload`? wdyt @jedcunningham @ashb ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r951779806


##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(self, *, task_instance, dataset, extra=None, session: Session) -> None:

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r951843553


##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(self, *, task_instance, dataset, extra=None, session: Session) -> None:

Review Comment:
   I tweaked it to only accept a `Dataset`, and put an isinstance check in the caller, since I _think_ task outlets can be things that aren't datasets.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r948644141


##########
airflow/utils/dataset.py:
##########
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from airflow.configuration import conf
+
+
+def get_dataset_manager(*args, **kwargs):
+    return conf.getimport(
+        'core', 'dataset_event_manager_class', fallback='airflow.models.dataset.DatasetEventManager'
+    )(*args, **kwargs)

Review Comment:
   This wrapper doesn’t feel very useful



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #25419: WIP: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r938286229


##########
airflow/models/dataset.py:
##########
@@ -203,6 +206,44 @@ def __repr__(self):
         return f"{self.__class__.__name__}({', '.join(args)})"
 
 
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(
+        self, *, task_instance=None, dataset=None, session: Session = NEW_SESSION

Review Comment:
   I don't have a strong preference here. I'm okay with both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #25419: WIP: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #25419:
URL: https://github.com/apache/airflow/pull/25419#issuecomment-1201358123

   I think lets slim this down for now, and only add the methods/"entrypoints" that the scheduler needs, and then a custom class can user `super()` etc to do what it wants.
   
   I _think_ we might also want to move some of the query/creating logic from Daniel's PR  #24969 to exist in this class (i.e. just moving the code and calling it from inside SchedulerJob) -- need to think more about query pattern/only-one-lock vs flexibility here. /cc @dstandish 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r948668071


##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.session import NEW_SESSION
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(
+        self, *, task_instance=None, dataset=None, extra=None, session: Session = NEW_SESSION
+    ) -> None:
+        """
+        For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
+        the dataset event
+        """
+        from airflow.datasets import Dataset
+        from airflow.models.dataset import DatasetModel
+
+        if isinstance(dataset, Dataset):
+            dataset = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
+            if not dataset:
+                self.log.warning("Dataset %s not found", dataset)
+                return
+        session.add(
+            DatasetEvent(
+                dataset_id=dataset.id,
+                source_task_id=task_instance.task_id,
+                source_dag_id=task_instance.dag_id,
+                source_run_id=task_instance.run_id,
+                source_map_index=task_instance.map_index,
+                extra=extra,
+            )
+        )

Review Comment:
   Nope, I caught those and I'm actually working on tests for this.
   
   Edit: And turning those into required, keyword-only arguments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r948683163


##########
airflow/utils/dataset.py:
##########
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from airflow.configuration import conf
+
+
+def get_dataset_manager(*args, **kwargs):
+    return conf.getimport(
+        'core', 'dataset_event_manager_class', fallback='airflow.models.dataset.DatasetEventManager'
+    )(*args, **kwargs)

Review Comment:
   I'll remove it. :+1:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #25419: WIP: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #25419:
URL: https://github.com/apache/airflow/pull/25419#issuecomment-1216621240

   There is now `airflow/datasets/` folder -- so should we move this it to `airflow/datasets/manager.py`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r948635341


##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.session import NEW_SESSION
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(
+        self, *, task_instance=None, dataset=None, extra=None, session: Session = NEW_SESSION
+    ) -> None:
+        """
+        For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
+        the dataset event
+        """
+        from airflow.datasets import Dataset
+        from airflow.models.dataset import DatasetModel
+
+        if isinstance(dataset, Dataset):
+            dataset = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
+            if not dataset:
+                self.log.warning("Dataset %s not found", dataset)
+                return
+        session.add(
+            DatasetEvent(
+                dataset_id=dataset.id,
+                source_task_id=task_instance.task_id,
+                source_dag_id=task_instance.dag_id,
+                source_run_id=task_instance.run_id,
+                source_map_index=task_instance.map_index,
+                extra=extra,
+            )
+        )

Review Comment:
   This fails to handle `dataset=None` and `task_instance=None`. Are those valid in the first place?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r950603212


##########
tests/datasets/test_manager.py:
##########
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+import pytest
+
+from airflow.datasets import Dataset
+from airflow.datasets.manager import DatasetEventManager
+from airflow.models.dataset import DatasetModel
+
+
+@pytest.fixture()
+def mock_task_instance():
+    mock_ti = mock.Mock()
+    mock_ti.task_id = "5"
+    mock_ti.dag_id = "7"
+    mock_ti.run_id = "11"
+    mock_ti.map_index = "13"
+    return mock_ti
+
+
+def create_mock_dag():
+    n = 1
+    while True:
+        mock_dag = mock.Mock()
+        mock_dag.dag_id = n
+        n += 1
+        yield mock_dag
+
+
+class TestDatasetEventManager:
+    def test_register_dataset_change_dataset_doesnt_exist(self, mock_task_instance):
+        dsem = DatasetEventManager()
+
+        dataset = Dataset(uri="dataset_doesnt_exist")
+
+        mock_session = mock.Mock()
+        # Gotta mock up the query results
+        mock_session.query.return_value.filter.return_value.one_or_none.return_value = None
+
+        dsem.register_dataset_change(task_instance=mock_task_instance, dataset=dataset, session=mock_session)
+
+        # Ensure that we have ignored the dataset and _not_ created a DatasetEvent or
+        # DatasetDagRunQueue rows
+        mock_session.add.assert_not_called()
+        mock_session.merge.assert_not_called()
+
+    def test_register_dataset_change_with_dataset(self, mock_task_instance):
+        dsem = DatasetEventManager()
+
+        mock_dag_1 = mock.MagicMock()
+        mock_dag_1.dag_id = 1
+        mock_dag_2 = mock.MagicMock()
+        mock_dag_2.dag_id = 2
+
+        ds = Dataset("test_dataset_uri")
+
+        dsm = DatasetModel(uri=ds.uri)
+        dsm.consuming_dags = [mock_dag_1, mock_dag_2]
+
+        mock_session = mock.Mock()
+        # Gotta mock up the query results
+        mock_session.query.return_value.filter.return_value.one_or_none.return_value = dsm
+
+        dsem.register_dataset_change(task_instance=mock_task_instance, dataset=ds, session=mock_session)
+
+        # Ensure we've created a dataset
+        mock_session.add.assert_called_once()
+        # Ensure that we've created DatasetDagRunQueue rows
+        assert mock_session.merge.call_count == 2
+
+    def test_register_dataset_change_with_datasetmodel(self, mock_task_instance):

Review Comment:
   Maybe parameterize this test with the one before it, since they are identical except for Dataset/DatasetModel?



##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(self, *, task_instance, dataset, extra=None, session: Session) -> None:

Review Comment:
   We should do typing here for ti and dataset. I sorta think we can be picky too and just accept datasetmodel?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r948668071


##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.session import NEW_SESSION
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(
+        self, *, task_instance=None, dataset=None, extra=None, session: Session = NEW_SESSION
+    ) -> None:
+        """
+        For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
+        the dataset event
+        """
+        from airflow.datasets import Dataset
+        from airflow.models.dataset import DatasetModel
+
+        if isinstance(dataset, Dataset):
+            dataset = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
+            if not dataset:
+                self.log.warning("Dataset %s not found", dataset)
+                return
+        session.add(
+            DatasetEvent(
+                dataset_id=dataset.id,
+                source_task_id=task_instance.task_id,
+                source_dag_id=task_instance.dag_id,
+                source_run_id=task_instance.run_id,
+                source_map_index=task_instance.map_index,
+                extra=extra,
+            )
+        )

Review Comment:
   Nope, I caught those and I'm actually working on tests for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r951844432


##########
airflow/models/taskinstance.py:
##########
@@ -1525,33 +1528,17 @@ def _run_raw_task(
             session.add(Log(self.state, self))
             session.merge(self)
             if self.state == TaskInstanceState.SUCCESS:
-                self._create_dataset_dag_run_queue_records(session=session)
+                self._register_dataset_changes(session=session)
             session.commit()
 
-    def _create_dataset_dag_run_queue_records(self, *, session: Session) -> None:
-        from airflow.datasets import Dataset
-        from airflow.models.dataset import DatasetModel
-
+    def _register_dataset_changes(self, *, session: Session) -> None:
         for obj in self.task.outlets or []:
             self.log.debug("outlet obj %s", obj)
-            if isinstance(obj, Dataset):
-                dataset = session.query(DatasetModel).filter(DatasetModel.uri == obj.uri).one_or_none()
-                if not dataset:
-                    self.log.warning("Dataset %s not found", obj)
-                    continue
-                consuming_dag_ids = [x.dag_id for x in dataset.consuming_dags]
-                self.log.debug("consuming dag ids %s", consuming_dag_ids)
-                session.add(
-                    DatasetEvent(
-                        dataset_id=dataset.id,
-                        source_task_id=self.task_id,
-                        source_dag_id=self.dag_id,
-                        source_run_id=self.run_id,
-                        source_map_index=self.map_index,
-                    )
-                )
-                for dag_id in consuming_dag_ids:
-                    session.merge(DatasetDagRunQueue(dataset_id=dataset.id, target_dag_id=dag_id))
+            self.dataset_event_manager.register_dataset_change(

Review Comment:
   100%. Fixed in the next commit. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r949491029


##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(self, *, task_instance, dataset, extra=None, session: Session) -> None:
+        """
+        For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
+        the dataset event
+        """
+        from airflow.datasets import Dataset
+        from airflow.models.dataset import DatasetEvent, DatasetModel
+
+        if isinstance(dataset, Dataset):
+            dataset = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
+            if not dataset:
+                self.log.warning("Dataset %s not found", dataset)
+                return
+            session.add(
+                DatasetEvent(
+                    dataset_id=dataset.id,
+                    source_task_id=task_instance.task_id,
+                    source_dag_id=task_instance.dag_id,
+                    source_run_id=task_instance.run_id,
+                    source_map_index=task_instance.map_index,
+                    extra=extra,
+                )
+            )
+            self._queue_dagruns(dataset, session)

Review Comment:
   Fixed, and because I think the only other reasonable thing to pass in the `dataset` parameter is a DatasetModel, I added a test for that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
blag commented on PR #25419:
URL: https://github.com/apache/airflow/pull/25419#issuecomment-1219071980

   Is `core.dataset_event_manager_class` the right place in the configuration to put this setting?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r948990643


##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(self, *, task_instance, dataset, extra=None, session: Session) -> None:
+        """
+        For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
+        the dataset event
+        """
+        from airflow.datasets import Dataset
+        from airflow.models.dataset import DatasetEvent, DatasetModel
+
+        if isinstance(dataset, Dataset):

Review Comment:
   When would a dataset not be a dataset? Shouldn't we do this check at the caller instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #25419: WIP: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r948496663


##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.session import NEW_SESSION

Review Comment:
   ```suggestion
   ```



##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.session import NEW_SESSION
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(
+        self, *, task_instance=None, dataset=None, extra=None, session: Session = NEW_SESSION

Review Comment:
   ```suggestion
           self, *, task_instance=None, dataset=None, extra=None, session: Session
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] blag commented on a diff in pull request #25419: Create a pluggable DatasetEventManager

Posted by GitBox <gi...@apache.org>.
blag commented on code in PR #25419:
URL: https://github.com/apache/airflow/pull/25419#discussion_r949477537


##########
airflow/datasets/manager.py:
##########
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from sqlalchemy.orm.session import Session
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class DatasetEventManager(LoggingMixin):
+    """
+    A pluggable class that manages operations for dataset events.
+
+    The intent is to have one place to handle all DatasetEvent-related operations, so different
+    Airflow deployments can use plugins that broadcast dataset events to each other.
+    """
+
+    def register_dataset_change(self, *, task_instance, dataset, extra=None, session: Session) -> None:
+        """
+        For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
+        the dataset event
+        """
+        from airflow.datasets import Dataset
+        from airflow.models.dataset import DatasetEvent, DatasetModel
+
+        if isinstance(dataset, Dataset):
+            dataset = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
+            if not dataset:
+                self.log.warning("Dataset %s not found", dataset)
+                return
+            session.add(
+                DatasetEvent(
+                    dataset_id=dataset.id,
+                    source_task_id=task_instance.task_id,
+                    source_dag_id=task_instance.dag_id,
+                    source_run_id=task_instance.run_id,
+                    source_map_index=task_instance.map_index,
+                    extra=extra,
+                )
+            )
+            self._queue_dagruns(dataset, session)

Review Comment:
   The [original code](https://github.com/apache/airflow/pull/25419/files#diff-62f7d8a52fefdb8e05d4f040c6d3459b4a56fe46976c24f68843dbaeb5a98487L1537) ignored anything that wasn't a Dataset. I didn't want to change the behavior during the refactor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org