You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/22 20:29:16 UTC

[GitHub] [airflow] kaxil commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing

kaxil commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r656498989



##########
File path: airflow/config_templates/default_airflow.cfg
##########
@@ -849,6 +849,10 @@ job_heartbeat_sec = 5
 # that no longer have a matching DagRun
 clean_tis_without_dagrun_interval = 15.0
 
+# How often (in seconds) to check and fail tis and dagruns whose corresponding

Review comment:
       ```suggestion
   # How often (in seconds) to check and fail task instances and dagruns whose corresponding
   ```

##########
File path: airflow/config_templates/config.yml
##########
@@ -1704,6 +1704,14 @@
       type: float
       example: ~
       default: "15.0"
+    - name: missing_dag_cleanup_interval
+      description: |
+        How often (in seconds) to check and fail tis and dagruns whose corresponding

Review comment:
       ```suggestion
           How often (in seconds) to check and fail task instances and dagruns whose corresponding
   ```

##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,31 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched
+                and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+                    dag_id=dag_id,
+                    session=session,
+                )
+                if not sd_last_updated_datetime:
+                    return False
+                return not sd_last_updated_datetime > self.dags_last_fetched[dag_id]

Review comment:
       Why do we want this case?
   
   This means if we have a sort of stale DAG it returns false.
   
   

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2124,6 +2124,59 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+        """This tests that if dags are missing in dagbag, then it should be failed"""
+        session = settings.Session()
+        dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        # Mock dagbag.has_dag
+        mock_has_dag = mock.MagicMock(return_value=False)
+        dagbag.has_dag = mock_has_dag
+
+        dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id)
+        # Create DAG run with FAILED state

Review comment:
       DagRun is not created with Failed state, comment might need updating

##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,31 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched
+                and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+                    dag_id=dag_id,
+                    session=session,
+                )
+                if not sd_last_updated_datetime:
+                    return False

Review comment:
       ```suggestion
                       self.log.warning("Serialized DAG %s no longer exists", dag_id)
                       del self.dags[dag_id]
                       del self.dags_last_fetched[dag_id]
                       del self.dags_hash[dag_id]
                       return False
   ```

##########
File path: tests/models/test_dagbag.py
##########
@@ -859,6 +859,39 @@ def test_get_dag_with_dag_serialization(self):
         assert set(updated_ser_dag_1.tags) == {"example", "example2", "new_tag"}
         assert updated_ser_dag_1_update_time > ser_dag_1_update_time
 
+    @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5)
+    def test_has_dag_method(self):
+        """Test has_dag method"""
+        dag_id = "example_bash_operator"
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
+            example_bash_op_dag = DagBag(include_examples=True).dags.get("example_bash_operator")
+            SerializedDagModel.write_dag(dag=example_bash_op_dag)
+
+            dag_bag = DagBag(read_dags_from_db=True)
+            dag_bag.get_dag(dag_id)  # Add dag to self.dags
+            assert dag_bag.has_dag(dag_id)
+            assert not dag_bag.has_dag("non_added_dag_id")
+
+        # Check that min_serialized_dag_fetch_interval has passed but
+        # dag is in SerializedDagModel and we return True
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 4)):
+            assert dag_bag.has_dag(dag_id)

Review comment:
       `min_serialized_dag_fetch_interval` hasn't passed though, it is just been 4 seconds but the `min_serialized_dag_fetch_interval` is set to 5 seconds, no?

##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,31 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched
+                and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+                    dag_id=dag_id,
+                    session=session,
+                )
+                if not sd_last_updated_datetime:
+                    return False

Review comment:
       Removes dags as it no longer exists! so that next time we return False faster as DAG won't be in the DagBag

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2124,6 +2124,59 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+        """This tests that if dags are missing in dagbag, then it should be failed"""
+        session = settings.Session()
+        dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        # Mock dagbag.has_dag
+        mock_has_dag = mock.MagicMock(return_value=False)
+        dagbag.has_dag = mock_has_dag
+
+        dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id)
+        # Create DAG run with FAILED state
+        dag.clear()
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE + timedelta(days=1),
+            start_date=DEFAULT_DATE + timedelta(days=1),
+            session=session,
+        )
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.RUNNING
+        session.commit()
+
+        # This poll interval is large, bug the scheduler doesn't sleep that
+        # long, instead we hit the missing_dag_cleanup_interval instead
+        self.scheduler_job = SchedulerJob(num_runs=2, processor_poll_interval=30)
+        self.scheduler_job.dagbag = dagbag
+        executor = MockExecutor(do_update=False)
+        executor.queued_tasks

Review comment:
       no-op ! Not needed ?

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2124,6 +2124,59 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+        """This tests that if dags are missing in dagbag, then it should be failed"""
+        session = settings.Session()
+        dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        # Mock dagbag.has_dag
+        mock_has_dag = mock.MagicMock(return_value=False)
+        dagbag.has_dag = mock_has_dag
+
+        dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id)
+        # Create DAG run with FAILED state
+        dag.clear()
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE + timedelta(days=1),
+            start_date=DEFAULT_DATE + timedelta(days=1),
+            session=session,
+        )
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.RUNNING
+        session.commit()
+
+        # This poll interval is large, bug the scheduler doesn't sleep that

Review comment:
       ```suggestion
           # This poll interval is large, but the scheduler doesn't sleep that
   ```

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2124,6 +2124,59 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+        """This tests that if dags are missing in dagbag, then it should be failed"""
+        session = settings.Session()
+        dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        # Mock dagbag.has_dag
+        mock_has_dag = mock.MagicMock(return_value=False)
+        dagbag.has_dag = mock_has_dag

Review comment:
       ```suggestion
           dagbag.has_dag = mock.MagicMock(return_value=False)
   ```
   
   as `mock_has_dag` isn't used anywhere else

##########
File path: tests/models/test_dagbag.py
##########
@@ -859,6 +859,39 @@ def test_get_dag_with_dag_serialization(self):
         assert set(updated_ser_dag_1.tags) == {"example", "example2", "new_tag"}
         assert updated_ser_dag_1_update_time > ser_dag_1_update_time
 
+    @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5)
+    def test_has_dag_method(self):
+        """Test has_dag method"""
+        dag_id = "example_bash_operator"
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
+            example_bash_op_dag = DagBag(include_examples=True).dags.get("example_bash_operator")

Review comment:
       ```suggestion
               example_bash_op_dag = DagBag(include_examples=True).dags.get(dag_id)
   ```

##########
File path: airflow/config_templates/config.yml
##########
@@ -1704,6 +1704,14 @@
       type: float
       example: ~
       default: "15.0"
+    - name: missing_dag_cleanup_interval
+      description: |
+        How often (in seconds) to check and fail tis and dagruns whose corresponding

Review comment:
       How about``clean_tis_without_dags`` instead to make it similar to `clean_tis_without_dagrun_interval` ??
   
   No strong opinion though

##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,31 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched
+                and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+                    dag_id=dag_id,
+                    session=session,
+                )
+                if not sd_last_updated_datetime:
+                    return False
+                return not sd_last_updated_datetime > self.dags_last_fetched[dag_id]
+
+            return SerializedDagModel.has_dag(dag_id=dag_id, session=session)

Review comment:
       ```suggestion
   ```
   
   This is sort of redundant, as if function has not returned yet, the line below will be executed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org