You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/20 07:41:48 UTC

[GitHub] [airflow] feluelle commented on a diff in pull request #26400: Create a more efficient airflow dag test command that also has better local logging

feluelle commented on code in PR #26400:
URL: https://github.com/apache/airflow/pull/26400#discussion_r974983366


##########
airflow/models/dag.py:
##########
@@ -3505,3 +3562,68 @@ def get_current_dag(cls) -> DAG | None:
             return cls._context_managed_dags[0]
         except IndexError:
             return None
+
+
+def _run_task(ti: TaskInstance, session):
+    """
+    Run a single task instance, and push result to Xcom for downstream tasks. Bypasses a lot of
+    extra steps used in `task.run` to keep our local running as fast as possible
+    This function is only meant for the `dag.test` function as a helper function.
+
+    Args:
+        ti: TaskInstance to run
+    """
+    log.info("*****************************************************")
+    if ti.map_index > 0:
+        log.info("Running task %s index %d", ti.task_id, ti.map_index)
+    else:
+        log.info("Running task %s", ti.task_id)
+    try:
+        ti._run_raw_task(session=session)
+        session.flush()
+        log.info("%s ran successfully!", ti.task_id)
+    except AirflowSkipException:
+        log.info("Task Skipped, continuing")
+    log.info("*****************************************************")
+
+
+def _get_or_create_dagrun(
+    dag: DAG,
+    conf: dict[Any, Any] | None,
+    start_date: datetime,
+    execution_date: datetime,
+    run_id: str,
+    session: Session,
+) -> DagRun:
+    """
+    Create a DAGRun, but only after clearing the previous instance of said dagrun to prevent collisions.
+    This function is only meant for the `dag.test` function as a helper function.
+    :param dag: Dag to be used to find dagrun
+    :param conf: configuration to pass to newly created dagrun
+    :param start_date: start date of new dagrun, defaults to execution_date
+    :param execution_date: execution_date for finding the dagrun
+    :param run_id: run_id to pass to new dagrun
+    :param session: sqlalchemy session
+    :return:
+    """
+    log.info("dagrun id:" + dag.dag_id)

Review Comment:
   ```suggestion
       log.info("dagrun id: %s", dag.dag_id)
   ```



##########
airflow/models/dag.py:
##########
@@ -2435,6 +2436,62 @@ def cli(self):
         args = parser.parse_args()
         args.func(args, self)
 
+    @provide_session
+    def test(
+        self,
+        execution_date: datetime | None = None,
+        run_conf: dict[str, Any] | None = None,
+        session: Session = NEW_SESSION,
+    ) -> None:
+        """Execute one single DagRun for a given DAG and execution date."""
+
+        def add_logger_if_needed(ti: TaskInstance):
+            """
+            Add a formatted logger to the taskinstance so all logs are surfaced to the command line instead
+            of into a task file. Since this is a local test run, it is much better for the user to see logs
+            in the command line, rather than needing to search for a log file.
+            Args:
+                ti: The taskinstance that will receive a logger
+
+            """
+            format = logging.Formatter("[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s")
+            handler = logging.StreamHandler(sys.stdout)
+            handler.level = logging.INFO
+            handler.setFormatter(format)
+            # only add log handler once
+            if not any(isinstance(h, logging.StreamHandler) for h in ti.log.handlers):
+                self.log.debug("Adding Streamhandler to taskinstance %s", ti.task_id)
+                ti.log.addHandler(handler)
+
+        execution_date = execution_date or timezone.utcnow()
+        self.clear(
+            start_date=execution_date,
+            end_date=execution_date,
+            dag_run_state=False,  # type: ignore
+            session=session,
+        )

Review Comment:
   ```suggestion
           self.log.debug("Clearing existing task instances for execution date %s", execution_date)
           self.clear(
               start_date=execution_date,
               end_date=execution_date,
               dag_run_state=False,  # type: ignore
               session=session,
           )
   ```



##########
airflow/models/dag.py:
##########
@@ -3505,3 +3562,68 @@ def get_current_dag(cls) -> DAG | None:
             return cls._context_managed_dags[0]
         except IndexError:
             return None
+
+
+def _run_task(ti: TaskInstance, session):
+    """
+    Run a single task instance, and push result to Xcom for downstream tasks. Bypasses a lot of
+    extra steps used in `task.run` to keep our local running as fast as possible
+    This function is only meant for the `dag.test` function as a helper function.
+
+    Args:
+        ti: TaskInstance to run
+    """
+    log.info("*****************************************************")
+    if ti.map_index > 0:
+        log.info("Running task %s index %d", ti.task_id, ti.map_index)
+    else:
+        log.info("Running task %s", ti.task_id)
+    try:
+        ti._run_raw_task(session=session)
+        session.flush()
+        log.info("%s ran successfully!", ti.task_id)
+    except AirflowSkipException:
+        log.info("Task Skipped, continuing")
+    log.info("*****************************************************")
+
+
+def _get_or_create_dagrun(
+    dag: DAG,
+    conf: dict[Any, Any] | None,
+    start_date: datetime,
+    execution_date: datetime,
+    run_id: str,
+    session: Session,
+) -> DagRun:
+    """
+    Create a DAGRun, but only after clearing the previous instance of said dagrun to prevent collisions.
+    This function is only meant for the `dag.test` function as a helper function.
+    :param dag: Dag to be used to find dagrun
+    :param conf: configuration to pass to newly created dagrun
+    :param start_date: start date of new dagrun, defaults to execution_date
+    :param execution_date: execution_date for finding the dagrun
+    :param run_id: run_id to pass to new dagrun
+    :param session: sqlalchemy session
+    :return:
+    """
+    log.info("dagrun id:" + dag.dag_id)
+    dr: DagRun = (
+        session.query(DagRun)
+        .filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date == execution_date)
+        .first()
+    )
+    if dr:
+        session.delete(dr)
+        session.commit()
+    dr = dag.create_dagrun(
+        state=DagRunState.RUNNING,
+        execution_date=execution_date,
+        run_id=run_id,
+        start_date=start_date or execution_date,
+        session=session,
+        conf=conf,  # type: ignore
+    )
+    session.add(dr)
+    session.flush()

Review Comment:
   `dag.create_dagrun` already calls this https://github.com/apache/airflow/blob/b885b906674d31fc2d15443e0d9d076007a170b1/airflow/models/dag.py#L2584-L2585)



##########
airflow/models/dag.py:
##########
@@ -3505,3 +3562,68 @@ def get_current_dag(cls) -> DAG | None:
             return cls._context_managed_dags[0]
         except IndexError:
             return None
+
+
+def _run_task(ti: TaskInstance, session):
+    """
+    Run a single task instance, and push result to Xcom for downstream tasks. Bypasses a lot of
+    extra steps used in `task.run` to keep our local running as fast as possible
+    This function is only meant for the `dag.test` function as a helper function.
+
+    Args:
+        ti: TaskInstance to run
+    """
+    log.info("*****************************************************")
+    if ti.map_index > 0:
+        log.info("Running task %s index %d", ti.task_id, ti.map_index)
+    else:
+        log.info("Running task %s", ti.task_id)
+    try:
+        ti._run_raw_task(session=session)
+        session.flush()
+        log.info("%s ran successfully!", ti.task_id)
+    except AirflowSkipException:
+        log.info("Task Skipped, continuing")
+    log.info("*****************************************************")
+
+
+def _get_or_create_dagrun(
+    dag: DAG,
+    conf: dict[Any, Any] | None,
+    start_date: datetime,
+    execution_date: datetime,
+    run_id: str,
+    session: Session,
+) -> DagRun:
+    """
+    Create a DAGRun, but only after clearing the previous instance of said dagrun to prevent collisions.
+    This function is only meant for the `dag.test` function as a helper function.
+    :param dag: Dag to be used to find dagrun
+    :param conf: configuration to pass to newly created dagrun
+    :param start_date: start date of new dagrun, defaults to execution_date
+    :param execution_date: execution_date for finding the dagrun
+    :param run_id: run_id to pass to new dagrun
+    :param session: sqlalchemy session
+    :return:
+    """
+    log.info("dagrun id:" + dag.dag_id)
+    dr: DagRun = (
+        session.query(DagRun)
+        .filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date == execution_date)
+        .first()
+    )
+    if dr:
+        session.delete(dr)
+        session.commit()

Review Comment:
   It might make sense to create `_delete_dagrun` for that. Mainly because of the low-level session calls imo. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org