You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/14 19:00:09 UTC

[GitHub] [airflow] ashb commented on a diff in pull request #26400: Create a more efficient airflow dag test command that also has better local logging

ashb commented on code in PR #26400:
URL: https://github.com/apache/airflow/pull/26400#discussion_r971187508


##########
airflow/cli/commands/dag_command.py:
##########
@@ -497,6 +523,60 @@ def dag_test(args, session=None):
             print(dot_graph.source)
 
 
+@provide_session
+def _run_task(ti: TaskInstance, session=None):
+    """
+    Run a single task instance, and push result to Xcom for downstream tasks. Bypasses a lot of
+    extra steps used in `task.run` to keep our local running as fast as possible
+    Args:
+        ti: TaskInstance to run
+    """
+    current_task = ti.render_templates(ti.get_template_context())
+    log.info("*****************************************************")
+    log.info("Running task %s", current_task.task_id)
+    try:
+        xcom_value = current_task.execute(context=ti.get_template_context())

Review Comment:
   ```suggestion
           xcom_value = ti.task.execute(context=ti.get_template_context(session=session))
   ```



##########
airflow/cli/commands/dag_command.py:
##########
@@ -497,6 +523,60 @@ def dag_test(args, session=None):
             print(dot_graph.source)
 
 
+@provide_session
+def _run_task(ti: TaskInstance, session=None):

Review Comment:
   ```suggestion
   def _run_task(ti: TaskInstance, session):
   ```



##########
airflow/cli/commands/dag_command.py:
##########
@@ -460,19 +478,27 @@ def dag_test(args, session=None):
     execution_date = args.execution_date or timezone.utcnow()
     dag = get_dag(subdir=args.subdir, dag_id=args.dag_id)
     dag.clear(start_date=execution_date, end_date=execution_date, dag_run_state=False)
-    try:
-        dag.run(
-            executor=DebugExecutor(),
-            start_date=execution_date,
-            end_date=execution_date,
-            conf=run_conf,
-            # Always run the DAG at least once even if no logical runs are
-            # available. This does not make a lot of sense, but Airflow has
-            # been doing this prior to 2.2 so we keep compatibility.
-            run_at_least_once=True,
-        )
-    except BackfillUnfinished as e:
-        print(str(e))
+
+    dr: DagRun = _get_or_create_dagrun(
+        dag=dag,
+        start_date=args.execution_date,
+        execution_date=args.execution_date,
+        run_id=DagRun.generate_run_id(DagRunType.MANUAL, execution_date),
+        session=session,
+        conf=run_conf,
+    )
+
+    tasks = dag.task_dict
+    log.info("starting dagrun")
+    # Instead of starting a scheduler, we run the minimal loop possible to check
+    # for task readiness and dependency management. This is notably faster
+    # than creating a BackfillJob and allows us to surface logs to the user
+    while dr.state == State.RUNNING:
+        schedulable_tis, _ = dr.update_state(session=session)
+        for ti in schedulable_tis:
+            add_logger_if_needed(ti)
+            ti.task = tasks[ti.task_id]
+            _run_task(ti)

Review Comment:
   General rule: don't use `@provide_session` anywhere but the "outside" of an API so
   
   ```suggestion
               _run_task(ti, session=session)
   ```



##########
airflow/cli/commands/dag_command.py:
##########
@@ -497,6 +523,60 @@ def dag_test(args, session=None):
             print(dot_graph.source)
 
 
+@provide_session
+def _run_task(ti: TaskInstance, session=None):
+    """
+    Run a single task instance, and push result to Xcom for downstream tasks. Bypasses a lot of
+    extra steps used in `task.run` to keep our local running as fast as possible
+    Args:
+        ti: TaskInstance to run
+    """
+    current_task = ti.render_templates(ti.get_template_context())
+    log.info("*****************************************************")
+    log.info("Running task %s", current_task.task_id)
+    try:
+        xcom_value = current_task.execute(context=ti.get_template_context())

Review Comment:
   You also aren't calling pre_execute or post_execute here, so some operators might not work.
   
   You probaly want to call a method on TI instead.
   
   Possible `ti._run_raw_task(session=session, test_mode=True)`



##########
airflow/cli/commands/dag_command.py:
##########
@@ -497,6 +523,60 @@ def dag_test(args, session=None):
             print(dot_graph.source)
 
 
+@provide_session
+def _run_task(ti: TaskInstance, session=None):
+    """
+    Run a single task instance, and push result to Xcom for downstream tasks. Bypasses a lot of
+    extra steps used in `task.run` to keep our local running as fast as possible
+    Args:
+        ti: TaskInstance to run
+    """
+    current_task = ti.render_templates(ti.get_template_context())

Review Comment:
   ```suggestion
       ti.render_templates(ti.get_template_context())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org