You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2022/09/09 13:04:31 UTC

[airflow] branch main updated: Don't blow up when a task produces a dataset that is not consumed. (#26257)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fe82e96095 Don't blow up when a task produces a dataset that is not consumed. (#26257)
fe82e96095 is described below

commit fe82e9609522fa302d3eaf7193689718571d4af4
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Fri Sep 9 06:04:16 2022 -0700

    Don't blow up when a task produces a dataset that is not consumed. (#26257)
    
    If you have a dataset outlet on a task, and no DAG was recorded as
    consuming that dataset it failed with a null value constraint violation
    in the db
---
 airflow/datasets/manager.py    |  7 +++----
 tests/datasets/test_manager.py | 14 ++++++++++++++
 2 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py
index 2b009c1b09..83539d5965 100644
--- a/airflow/datasets/manager.py
+++ b/airflow/datasets/manager.py
@@ -61,7 +61,9 @@ class DatasetManager(LoggingMixin):
                 extra=extra,
             )
         )
-        self._queue_dagruns(dataset_model, session)
+        if dataset_model.consuming_dags:
+            self._queue_dagruns(dataset_model, session)
+        session.flush()
 
     def _queue_dagruns(self, dataset: DatasetModel, session: Session) -> None:
         # Possible race condition: if multiple dags or multiple (usually
@@ -91,8 +93,6 @@ class DatasetManager(LoggingMixin):
             except exc.IntegrityError:
                 self.log.debug("Skipping record %s", item, exc_info=True)
 
-        session.flush()
-
     def _postgres_queue_dagruns(self, dataset: DatasetModel, session: Session) -> None:
         from sqlalchemy.dialects.postgresql import insert
 
@@ -101,7 +101,6 @@ class DatasetManager(LoggingMixin):
             stmt,
             [{'target_dag_id': target_dag.dag_id} for target_dag in dataset.consuming_dags],
         )
-        session.flush()
 
 
 def resolve_dataset_manager() -> "DatasetManager":
diff --git a/tests/datasets/test_manager.py b/tests/datasets/test_manager.py
index 4ff3b28847..42dffd76a1 100644
--- a/tests/datasets/test_manager.py
+++ b/tests/datasets/test_manager.py
@@ -80,3 +80,17 @@ class TestDatasetManager:
         # Ensure we've created a dataset
         assert session.query(DatasetEvent).filter_by(dataset_id=dsm.id).count() == 1
         assert session.query(DatasetDagRunQueue).count() == 2
+
+    def test_register_dataset_change_no_downstreams(self, session, mock_task_instance):
+        dsem = DatasetManager()
+
+        ds = Dataset(uri="never_consumed")
+        dsm = DatasetModel(uri="never_consumed")
+        session.add(dsm)
+        session.flush()
+
+        dsem.register_dataset_change(task_instance=mock_task_instance, dataset=ds, session=session)
+
+        # Ensure we've created a dataset
+        assert session.query(DatasetEvent).filter_by(dataset_id=dsm.id).count() == 1
+        assert session.query(DatasetDagRunQueue).count() == 0