You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/09/10 12:13:25 UTC

[GitHub] [airflow] uranusjr opened a new pull request #18141: Fix quarentine tests affected by AIP-39

uranusjr opened a new pull request #18141:
URL: https://github.com/apache/airflow/pull/18141


   When we did `TaskInstance`’s migration from `execution_date` to `run_id`, we forgot to check the quarentined tests. This fixes them.
   
   Note that this does _not_ make the quarentined tests to pass (although some do pass now), only modify them to fit the test setup required by AIP-39, namely 1. `TaskInstance.execution_date` is now fetched via a `JOIN` to `DagRun`, and an associating `DagRun` is now requried for every `TaskInstance`.
   
   This also fixes some of the tests to match reality, which I’ll comment inline.
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr merged pull request #18141: Fix quarentine tests affected by AIP-39

Posted by GitBox <gi...@apache.org>.
uranusjr merged pull request #18141:
URL: https://github.com/apache/airflow/pull/18141


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #18141: Fix quarentine tests affected by AIP-39

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #18141:
URL: https://github.com/apache/airflow/pull/18141#issuecomment-916927994


   Yeah, quarantined tests that don't fail a build _essentially_ don't exist :( 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #18141: Fix quarentine tests affected by AIP-39

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #18141:
URL: https://github.com/apache/airflow/pull/18141#discussion_r706131668



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -954,7 +954,7 @@ def _schedule_dag_run(
             unfinished_task_instances = (
                 session.query(TI)
                 .filter(TI.dag_id == dag_run.dag_id)
-                .filter(TI.execution_date == dag_run.execution_date)
+                .filter(TI.run_id == dag_run.run_id)

Review comment:
       Because AIP-39. This code path does not seem to be covered by any existing case?

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -3176,40 +3177,40 @@ def test_execute_queries_count_with_harvested_dags(self, expected_query_count, d
 
                     self.scheduler_job._run_scheduler_loop()
 
-    @parameterized.expand(
+    @pytest.mark.parametrize(
+        "expected_query_counts, dag_count, task_count, start_ago, schedule_interval, shape",
         [
-            # expected, dag_count, task_count, start_ago, schedule_interval, shape
-            # One DAG with one task per DAG file
-            ([9, 9, 9, 9], 1, 1, "1d", "None", "no_structure"),
-            ([9, 9, 9, 9], 1, 1, "1d", "None", "linear"),
-            ([21, 12, 12, 12], 1, 1, "1d", "@once", "no_structure"),
-            ([21, 12, 12, 12], 1, 1, "1d", "@once", "linear"),
-            ([21, 22, 24, 26], 1, 1, "1d", "30m", "no_structure"),
-            ([21, 22, 24, 26], 1, 1, "1d", "30m", "linear"),
-            ([21, 22, 24, 26], 1, 1, "1d", "30m", "binary_tree"),
-            ([21, 22, 24, 26], 1, 1, "1d", "30m", "star"),
-            ([21, 22, 24, 26], 1, 1, "1d", "30m", "grid"),
-            # One DAG with five tasks per DAG  file
-            ([9, 9, 9, 9], 1, 5, "1d", "None", "no_structure"),
-            ([9, 9, 9, 9], 1, 5, "1d", "None", "linear"),
-            ([21, 12, 12, 12], 1, 5, "1d", "@once", "no_structure"),
-            ([22, 13, 13, 13], 1, 5, "1d", "@once", "linear"),
-            ([21, 22, 24, 26], 1, 5, "1d", "30m", "no_structure"),
-            ([22, 24, 27, 30], 1, 5, "1d", "30m", "linear"),
-            ([22, 24, 27, 30], 1, 5, "1d", "30m", "binary_tree"),
-            ([22, 24, 27, 30], 1, 5, "1d", "30m", "star"),
-            ([22, 24, 27, 30], 1, 5, "1d", "30m", "grid"),
-            # 10 DAGs with 10 tasks per DAG file
-            ([9, 9, 9, 9], 10, 10, "1d", "None", "no_structure"),
-            ([9, 9, 9, 9], 10, 10, "1d", "None", "linear"),
-            ([84, 27, 27, 27], 10, 10, "1d", "@once", "no_structure"),
-            ([94, 40, 40, 40], 10, 10, "1d", "@once", "linear"),
-            ([84, 88, 88, 88], 10, 10, "1d", "30m", "no_structure"),
-            ([94, 114, 114, 114], 10, 10, "1d", "30m", "linear"),
-            ([94, 108, 108, 108], 10, 10, "1d", "30m", "binary_tree"),
-            ([94, 108, 108, 108], 10, 10, "1d", "30m", "star"),
-            ([94, 108, 108, 108], 10, 10, "1d", "30m", "grid"),
-        ]
+            # One DAG with one task per DAG file.
+            ([10, 10, 10, 10], 1, 1, "1d", "None", "no_structure"),
+            ([10, 10, 10, 10], 1, 1, "1d", "None", "linear"),
+            ([23, 13, 13, 13], 1, 1, "1d", "@once", "no_structure"),
+            ([23, 13, 13, 13], 1, 1, "1d", "@once", "linear"),
+            ([23, 24, 26, 28], 1, 1, "1d", "30m", "no_structure"),
+            ([23, 24, 26, 28], 1, 1, "1d", "30m", "linear"),
+            ([23, 24, 26, 28], 1, 1, "1d", "30m", "binary_tree"),
+            ([23, 24, 26, 28], 1, 1, "1d", "30m", "star"),
+            ([23, 24, 26, 28], 1, 1, "1d", "30m", "grid"),
+            # One DAG with five tasks per DAG file.
+            ([10, 10, 10, 10], 1, 5, "1d", "None", "no_structure"),
+            ([10, 10, 10, 10], 1, 5, "1d", "None", "linear"),
+            ([23, 13, 13, 13], 1, 5, "1d", "@once", "no_structure"),
+            ([24, 14, 14, 14], 1, 5, "1d", "@once", "linear"),
+            ([23, 24, 26, 28], 1, 5, "1d", "30m", "no_structure"),
+            ([24, 26, 29, 32], 1, 5, "1d", "30m", "linear"),
+            ([24, 26, 29, 32], 1, 5, "1d", "30m", "binary_tree"),
+            ([24, 26, 29, 32], 1, 5, "1d", "30m", "star"),
+            ([24, 26, 29, 32], 1, 5, "1d", "30m", "grid"),
+            # 10 DAGs with 10 tasks per DAG file.
+            ([10, 10, 10, 10], 10, 10, "1d", "None", "no_structure"),
+            ([10, 10, 10, 10], 10, 10, "1d", "None", "linear"),
+            ([95, 28, 28, 28], 10, 10, "1d", "@once", "no_structure"),
+            ([105, 41, 41, 41], 10, 10, "1d", "@once", "linear"),
+            ([95, 99, 99, 99], 10, 10, "1d", "30m", "no_structure"),
+            ([105, 125, 125, 125], 10, 10, "1d", "30m", "linear"),
+            ([105, 119, 119, 119], 10, 10, "1d", "30m", "binary_tree"),
+            ([105, 119, 119, 119], 10, 10, "1d", "30m", "star"),
+            ([105, 119, 119, 119], 10, 10, "1d", "30m", "grid"),
+        ],

Review comment:
       Query count changes in this function are not related to AIP-39 (I tracked this breakage to a few months ago).

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -795,7 +795,7 @@ def _create_dagruns_for_dags(self, guard, session):
         guard.commit()
         # END: create dagruns
 
-    def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) -> None:
+    def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -> None:

Review comment:
       This annotation was wrong. `dag_models` is iterated over multiple times in this function, and an iterable does not satisfy that usage (an iterator can only be iterated through once).

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -3258,25 +3269,25 @@ def test_should_mark_dummy_task_as_success(self):
         dagbag = DagBag(dag_folder=dag_file, include_examples=False, read_dags_from_db=False)
         dagbag.sync_to_db()
 
-        self.scheduler_job_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job_job.processor_agent = mock.MagicMock()
-        dag = self.scheduler_job_job.dagbag.get_dag("test_only_dummy_tasks")
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        dag = self.scheduler_job.dagbag.get_dag("test_only_dummy_tasks")

Review comment:
       Totally unrelated to AIP-39.

##########
File path: tests/executors/test_celery_executor.py
##########
@@ -529,7 +529,7 @@ def test_send_tasks_to_celery_hang(register_signals):
     executor = celery_executor.CeleryExecutor()
 
     task = MockTask()
-    task_tuples_to_send = [(None, None, None, None, task) for _ in range(26)]
+    task_tuples_to_send = [(None, None, None, task) for _ in range(26)]

Review comment:
       `_send_tasks_to_celery` was modified to accept 4-tuples (not 5) a while ago, but this test was never updated. Not related to AIP-39.

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1915,37 +1916,39 @@ def test_verify_integrity_if_dag_changed(self, dag_maker):
         session.close()
 
     @pytest.mark.quarantined
+    @pytest.mark.need_serialized_dag
     def test_retry_still_in_executor(self, dag_maker):
         """
         Checks if the scheduler does not put a task in limbo, when a task is retried
         but is still present in the executor.
         """
         executor = MockExecutor(do_update=False)
-        dagbag = DagBag(dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"), include_examples=False)
-        dagbag.dags.clear()
-
-        with dag_maker(dag_id='test_retry_still_in_executor', schedule_interval="@once") as dag:
-            dag_task1 = BashOperator(
-                task_id='test_retry_handling_op',
-                bash_command='exit 1',
-                retries=1,
-            )
 
         with create_session() as session:
-            orm_dag = DagModel(dag_id=dag.dag_id)
-            orm_dag.is_paused = False
-            session.merge(orm_dag)
+            with dag_maker(
+                dag_id='test_retry_still_in_executor',
+                schedule_interval="@once",
+                session=session,
+            ):
+                dag_task1 = BashOperator(
+                    task_id='test_retry_handling_op',
+                    bash_command='exit 1',
+                    retries=1,
+                )
+            dag_maker.dag_model.calculate_dagrun_date_fields(dag_maker.dag, None)
 
-        @mock.patch('airflow.dag_processing.processor.DagBag', return_value=dagbag)
-        def do_schedule(mock_dagbag):
+        @provide_session
+        def do_schedule(session):
             # Use a empty file since the above mock will return the
             # expected DAGs. Also specify only a single file so that it doesn't
             # try to schedule the above DAG repeatedly.
-            self.scheduler_job = SchedulerJob(
-                num_runs=1, executor=executor, subdir=os.path.join(settings.DAGS_FOLDER, "no_dags.py")
-            )
+            self.scheduler_job = SchedulerJob(num_runs=1, executor=executor, subdir=os.devnull)
+            self.scheduler_job.dagbag = dag_maker.dagbag
             self.scheduler_job.heartrate = 0
-            self.scheduler_job.run()
+            # Since the DAG is not in the directory watched by scheduler job,
+            # it would've been marked as deleted and not being scheduled.
+            with mock.patch.object(DagModel, "deactivate_deleted_dags"):
+                self.scheduler_job.run()

Review comment:
       This test was broken after we implemented auto DAG deactivation. Not related to AIP-39.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #18141: Fix quarentine tests affected by AIP-39

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #18141:
URL: https://github.com/apache/airflow/pull/18141#discussion_r706332036



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1669,22 +1668,24 @@ def test_scheduler_keeps_scheduling_pool_full(self, dag_maker):
         with dag_maker(
             dag_id='test_scheduler_keeps_scheduling_pool_full_d1',
             start_date=DEFAULT_DATE,
-        ) as dag_d1:
+        ):
             BashOperator(
                 task_id='test_scheduler_keeps_scheduling_pool_full_t1',
                 pool='test_scheduler_keeps_scheduling_pool_full_p1',
                 bash_command='echo hi',
             )
+        dag_d1 = dag_maker.dag

Review comment:
       For some reason if you use `dag_maker` more than once, the `as` variables are binded to the same DAG:
   
   ```python
   with dag_maker(...) as dag1:
       ...
   
   with dag_maker(...) as dag2:
       ...
   
   assert dag1 is dag2. # True!
   ```
   
   I don't understand how this could happen, maybe there's some Pytest magic involved, but this is the easiest workaround anyway so I just did it instead of going down the rabbit hole.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #18141: Fix quarentine tests affected by AIP-39

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #18141:
URL: https://github.com/apache/airflow/pull/18141#discussion_r706332036



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1669,22 +1668,24 @@ def test_scheduler_keeps_scheduling_pool_full(self, dag_maker):
         with dag_maker(
             dag_id='test_scheduler_keeps_scheduling_pool_full_d1',
             start_date=DEFAULT_DATE,
-        ) as dag_d1:
+        ):
             BashOperator(
                 task_id='test_scheduler_keeps_scheduling_pool_full_t1',
                 pool='test_scheduler_keeps_scheduling_pool_full_p1',
                 bash_command='echo hi',
             )
+        dag_d1 = dag_maker.dag

Review comment:
       For some reason if you use `dag_maker` more than once, the `as` variables are binded to the same DAG:
   
   ```python
   with dag_maker(...) as dag1:
       ...
   
   with dag_maker(...) as dag2:
       ...
   
   assert dag1 is dag2  # True!
   ```
   
   I don't understand how this could happen, maybe there's some Pytest magic involved, but this is the easiest workaround anyway so I just did it instead of going down the rabbit hole.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #18141: Fix quarentine tests affected by AIP-39

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #18141:
URL: https://github.com/apache/airflow/pull/18141#issuecomment-916860597


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #18141: Fix quarentine tests affected by AIP-39

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #18141:
URL: https://github.com/apache/airflow/pull/18141#discussion_r706209185



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1669,22 +1668,24 @@ def test_scheduler_keeps_scheduling_pool_full(self, dag_maker):
         with dag_maker(
             dag_id='test_scheduler_keeps_scheduling_pool_full_d1',
             start_date=DEFAULT_DATE,
-        ) as dag_d1:
+        ):
             BashOperator(
                 task_id='test_scheduler_keeps_scheduling_pool_full_t1',
                 pool='test_scheduler_keeps_scheduling_pool_full_p1',
                 bash_command='echo hi',
             )
+        dag_d1 = dag_maker.dag

Review comment:
       I thought  the dag_maker yielded the dag, so this style of change shouldn't be necessary?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org