You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/22 20:29:16 UTC
[GitHub] [airflow] kaxil commented on a change in pull request #16182: Do not queue tasks when the DAG goes missing
kaxil commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r656498989
##########
File path: airflow/config_templates/default_airflow.cfg
##########
@@ -849,6 +849,10 @@ job_heartbeat_sec = 5
# that no longer have a matching DagRun
clean_tis_without_dagrun_interval = 15.0
+# How often (in seconds) to check and fail tis and dagruns whose corresponding
Review comment:
```suggestion
# How often (in seconds) to check and fail task instances and dagruns whose corresponding
```
##########
File path: airflow/config_templates/config.yml
##########
@@ -1704,6 +1704,14 @@
type: float
example: ~
default: "15.0"
+ - name: missing_dag_cleanup_interval
+ description: |
+ How often (in seconds) to check and fail tis and dagruns whose corresponding
Review comment:
```suggestion
How often (in seconds) to check and fail task instances and dagruns whose corresponding
```
##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,31 @@ def dag_ids(self) -> List[str]:
"""
return list(self.dags.keys())
+ @provide_session
+ def has_dag(self, dag_id: str, session: Session = None):
+ """
+ Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+ else check in the DB if the dag exists, return True, False otherwise
+ """
+ from airflow.models.serialized_dag import SerializedDagModel
+
+ if dag_id in self.dags:
+ min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+ if (
+ dag_id in self.dags_last_fetched
+ and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+ ):
+ sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+ dag_id=dag_id,
+ session=session,
+ )
+ if not sd_last_updated_datetime:
+ return False
+ return not sd_last_updated_datetime > self.dags_last_fetched[dag_id]
Review comment:
Why do we want this case?
This means if we have a sort of stale DAG it returns false.
##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2124,6 +2124,59 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
assert ti.start_date == ti.end_date
assert ti.duration is not None
+ def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+ """This tests that if dags are missing in dagbag, then it should be failed"""
+ session = settings.Session()
+ dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+ dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+ with dag:
+ op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+ # Write Dag to DB
+ dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+ dagbag.bag_dag(dag, root_dag=dag)
+ dagbag.sync_to_db()
+
+ # Mock dagbag.has_dag
+ mock_has_dag = mock.MagicMock(return_value=False)
+ dagbag.has_dag = mock_has_dag
+
+ dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id)
+ # Create DAG run with FAILED state
Review comment:
DagRun is not created with Failed state, comment might need updating
##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,31 @@ def dag_ids(self) -> List[str]:
"""
return list(self.dags.keys())
+ @provide_session
+ def has_dag(self, dag_id: str, session: Session = None):
+ """
+ Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+ else check in the DB if the dag exists, return True, False otherwise
+ """
+ from airflow.models.serialized_dag import SerializedDagModel
+
+ if dag_id in self.dags:
+ min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+ if (
+ dag_id in self.dags_last_fetched
+ and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+ ):
+ sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+ dag_id=dag_id,
+ session=session,
+ )
+ if not sd_last_updated_datetime:
+ return False
Review comment:
```suggestion
self.log.warning("Serialized DAG %s no longer exists", dag_id)
del self.dags[dag_id]
del self.dags_last_fetched[dag_id]
del self.dags_hash[dag_id]
return False
```
##########
File path: tests/models/test_dagbag.py
##########
@@ -859,6 +859,39 @@ def test_get_dag_with_dag_serialization(self):
assert set(updated_ser_dag_1.tags) == {"example", "example2", "new_tag"}
assert updated_ser_dag_1_update_time > ser_dag_1_update_time
+ @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5)
+ def test_has_dag_method(self):
+ """Test has_dag method"""
+ dag_id = "example_bash_operator"
+ with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
+ example_bash_op_dag = DagBag(include_examples=True).dags.get("example_bash_operator")
+ SerializedDagModel.write_dag(dag=example_bash_op_dag)
+
+ dag_bag = DagBag(read_dags_from_db=True)
+ dag_bag.get_dag(dag_id) # Add dag to self.dags
+ assert dag_bag.has_dag(dag_id)
+ assert not dag_bag.has_dag("non_added_dag_id")
+
+ # Check that min_serialized_dag_fetch_interval has passed but
+ # dag is in SerializedDagModel and we return True
+ with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 4)):
+ assert dag_bag.has_dag(dag_id)
Review comment:
`min_serialized_dag_fetch_interval` hasn't passed though, it is just been 4 seconds but the `min_serialized_dag_fetch_interval` is set to 5 seconds, no?
##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,31 @@ def dag_ids(self) -> List[str]:
"""
return list(self.dags.keys())
+ @provide_session
+ def has_dag(self, dag_id: str, session: Session = None):
+ """
+ Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+ else check in the DB if the dag exists, return True, False otherwise
+ """
+ from airflow.models.serialized_dag import SerializedDagModel
+
+ if dag_id in self.dags:
+ min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+ if (
+ dag_id in self.dags_last_fetched
+ and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+ ):
+ sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+ dag_id=dag_id,
+ session=session,
+ )
+ if not sd_last_updated_datetime:
+ return False
Review comment:
Removes dags as it no longer exists! so that next time we return False faster as DAG won't be in the DagBag
##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2124,6 +2124,59 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
assert ti.start_date == ti.end_date
assert ti.duration is not None
+ def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+ """This tests that if dags are missing in dagbag, then it should be failed"""
+ session = settings.Session()
+ dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+ dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+ with dag:
+ op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+ # Write Dag to DB
+ dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+ dagbag.bag_dag(dag, root_dag=dag)
+ dagbag.sync_to_db()
+
+ # Mock dagbag.has_dag
+ mock_has_dag = mock.MagicMock(return_value=False)
+ dagbag.has_dag = mock_has_dag
+
+ dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id)
+ # Create DAG run with FAILED state
+ dag.clear()
+ dr = dag.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ state=State.RUNNING,
+ execution_date=DEFAULT_DATE + timedelta(days=1),
+ start_date=DEFAULT_DATE + timedelta(days=1),
+ session=session,
+ )
+ ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+ ti.state = State.RUNNING
+ session.commit()
+
+ # This poll interval is large, bug the scheduler doesn't sleep that
+ # long, instead we hit the missing_dag_cleanup_interval instead
+ self.scheduler_job = SchedulerJob(num_runs=2, processor_poll_interval=30)
+ self.scheduler_job.dagbag = dagbag
+ executor = MockExecutor(do_update=False)
+ executor.queued_tasks
Review comment:
no-op ! Not needed ?
##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2124,6 +2124,59 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
assert ti.start_date == ti.end_date
assert ti.duration is not None
+ def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+ """This tests that if dags are missing in dagbag, then it should be failed"""
+ session = settings.Session()
+ dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+ dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+ with dag:
+ op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+ # Write Dag to DB
+ dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+ dagbag.bag_dag(dag, root_dag=dag)
+ dagbag.sync_to_db()
+
+ # Mock dagbag.has_dag
+ mock_has_dag = mock.MagicMock(return_value=False)
+ dagbag.has_dag = mock_has_dag
+
+ dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag_id)
+ # Create DAG run with FAILED state
+ dag.clear()
+ dr = dag.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ state=State.RUNNING,
+ execution_date=DEFAULT_DATE + timedelta(days=1),
+ start_date=DEFAULT_DATE + timedelta(days=1),
+ session=session,
+ )
+ ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+ ti.state = State.RUNNING
+ session.commit()
+
+ # This poll interval is large, bug the scheduler doesn't sleep that
Review comment:
```suggestion
# This poll interval is large, but the scheduler doesn't sleep that
```
##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2124,6 +2124,59 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun(
assert ti.start_date == ti.end_date
assert ti.duration is not None
+ def test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+ """This tests that if dags are missing in dagbag, then it should be failed"""
+ session = settings.Session()
+ dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+ dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+ with dag:
+ op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+ # Write Dag to DB
+ dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+ dagbag.bag_dag(dag, root_dag=dag)
+ dagbag.sync_to_db()
+
+ # Mock dagbag.has_dag
+ mock_has_dag = mock.MagicMock(return_value=False)
+ dagbag.has_dag = mock_has_dag
Review comment:
```suggestion
dagbag.has_dag = mock.MagicMock(return_value=False)
```
as `mock_has_dag` isn't used anywhere else
##########
File path: tests/models/test_dagbag.py
##########
@@ -859,6 +859,39 @@ def test_get_dag_with_dag_serialization(self):
assert set(updated_ser_dag_1.tags) == {"example", "example2", "new_tag"}
assert updated_ser_dag_1_update_time > ser_dag_1_update_time
+ @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 5)
+ def test_has_dag_method(self):
+ """Test has_dag method"""
+ dag_id = "example_bash_operator"
+ with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
+ example_bash_op_dag = DagBag(include_examples=True).dags.get("example_bash_operator")
Review comment:
```suggestion
example_bash_op_dag = DagBag(include_examples=True).dags.get(dag_id)
```
##########
File path: airflow/config_templates/config.yml
##########
@@ -1704,6 +1704,14 @@
type: float
example: ~
default: "15.0"
+ - name: missing_dag_cleanup_interval
+ description: |
+ How often (in seconds) to check and fail tis and dagruns whose corresponding
Review comment:
How about``clean_tis_without_dags`` instead to make it similar to `clean_tis_without_dagrun_interval` ??
No strong opinion though
##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,31 @@ def dag_ids(self) -> List[str]:
"""
return list(self.dags.keys())
+ @provide_session
+ def has_dag(self, dag_id: str, session: Session = None):
+ """
+ Checks the "local" cache, if it exists, and is not older than the configured cache time, return True
+ else check in the DB if the dag exists, return True, False otherwise
+ """
+ from airflow.models.serialized_dag import SerializedDagModel
+
+ if dag_id in self.dags:
+ min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+ if (
+ dag_id in self.dags_last_fetched
+ and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
+ ):
+ sd_last_updated_datetime = SerializedDagModel.get_last_updated_datetime(
+ dag_id=dag_id,
+ session=session,
+ )
+ if not sd_last_updated_datetime:
+ return False
+ return not sd_last_updated_datetime > self.dags_last_fetched[dag_id]
+
+ return SerializedDagModel.has_dag(dag_id=dag_id, session=session)
Review comment:
```suggestion
```
This is sort of redundant, as if function has not returned yet, the line below will be executed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org