You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/05 07:49:46 UTC

[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #21877: AIP-45 Remove dag parsing in airflow run local

ephraimbuddy commented on code in PR #21877:
URL: https://github.com/apache/airflow/pull/21877#discussion_r842466796


##########
airflow/utils/cli.py:
##########
@@ -213,6 +213,16 @@ def get_dag(subdir: Optional[str], dag_id: str) -> "DAG":
     return dagbag.dags[dag_id]
 
 
+def get_dag_by_deserialization(dag_id: str) -> "DAG":
+    from airflow.models.serialized_dag import SerializedDagModel
+
+    dag_model = SerializedDagModel.get(dag_id)
+    if dag_model is None:
+        raise AirflowException(f"Serialized dag_id could not be found: {dag_id}")

Review Comment:
   ```suggestion
           raise AirflowException(f"Serialized DAG: {dag_id} could not be found")
   ```
   How about this?



##########
airflow/jobs/scheduler_job.py:
##########
@@ -783,6 +783,22 @@ def _execute(self) -> None:
                     self.log.exception("Exception when executing DagFileProcessorAgent.end")
             self.log.info("Exited execute loop")
 
+    def _update_dag_run_state_for_paused_dags(self):
+        try:
+            paused_dag_ids = DagModel.get_all_paused_dag_ids()
+            for dag_id in paused_dag_ids:

Review Comment:
   Hmm, for a deployment with 3000 dags, I wonder if there would be a challenge here?



##########
tests/jobs/test_local_task_job.py:
##########
@@ -276,39 +278,26 @@ def test_heartbeat_failed_fast(self):
                 delta = (time2 - time1).total_seconds()
                 assert abs(delta - job.heartrate) < 0.5
 
-    @patch('airflow.utils.process_utils.subprocess.check_call')
-    @patch.object(StandardTaskRunner, 'return_code')
-    def test_mark_success_no_kill(self, mock_return_code, _check_call, caplog, dag_maker):
+    def test_mark_success_no_kill(self, caplog, get_test_dag):
         """
         Test that ensures that mark_success in the UI doesn't cause
         the task to fail, and that the task exits
         """
-        session = settings.Session()
-
-        def task_function(ti):
-            assert ti.state == State.RUNNING
-            # Simulate marking this successful in the UI
-            ti.state = State.SUCCESS
-            session.merge(ti)
-            session.commit()
-            # The below code will not run as heartbeat will detect change of state
-            time.sleep(10)
-
-        with dag_maker('test_mark_success'):
-            task = PythonOperator(task_id="task1", python_callable=task_function)
-        dr = dag_maker.create_dagrun()
+        dag = get_test_dag('test_mark_state')
+        with create_session() as session:

Review Comment:
   We have `session` fixture



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org