You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/03/28 06:15:59 UTC

[GitHub] [airflow] koliankolin commented on issue #14975: How to trigger dag manually in 2.0.1 version with pickled dags

koliankolin commented on issue #14975:
URL: https://github.com/apache/airflow/issues/14975#issuecomment-808853781


   UPD: I found out reasons that cause this problem.
   
   As I mentioned before bug is reproduced with filling pickle_id in dag table. For that purpose we use DagModel.
   
   I should start with a fact that creating task_instance to launch happens differently. For example using DebugExecutor we import TaskInstance and create simple instance of it, using cli command (airflow tasks run --pickle 1) we also use logic as in DebugExecutor but when _scheduler_ is used logic is changed. In the file scheduler_job.py creating task_instance objects is placed in
   method **airflow.jobs.scheduler_job.SchedulerJob._executable_task_instances_to_queued** and there is such kind of code:
   ```
   query = (
       session.query(TI)
       .outerjoin(TI.dag_run)
       .filter(or_(DR.run_id.is_(None), DR.run_type != DagRunType.BACKFILL_JOB))
       .join(TI.dag_model)
       .filter(not_(DM.is_paused))
       .filter(TI.state == State.SCHEDULED)
       .options(selectinload('dag_model'))
       .limit(max_tis)
   )
   ```
   As you can see we can reproduce bug only using scheduler because only in this case we enrich our task_instance with DagModel. In different words only using _scheduler_ we add pickle_id to task_instance. Ok, keep this fact in mind and move on :)
   
   Next we use method **airflow.models.taskinstance.TaskInstance.generate_command** to create list with command and previously kept fact we get such kind of list:
   ```
   ['airflow', 'tasks', 'run', 'dummy', 'init', '2021-03-26T11:30:20.470827+00:00', '--pickle', '11', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/airflow/dags/dag_generator.py']
   ```
   Ok, after forming this command list instance of **airflow.task.task_runner.standard_task_runner.StandardTaskRunner** is created and in our case method **airflow.task.task_runner.standard_task_runner.StandardTaskRunner._start_by_fork** is used to start task_instance. Next all magic is happened :)
   
   In this method we feed argument parser with command list and go with this args to such kind of code:
   ```
   parser = get_parser()
   # [1:] - remove "airflow" from the start of the command
   args = parser.parse_args(self._command[1:])
   
   self.log.info('Running: %s', self._command)
   self.log.info('Job %s: Subtask %s', self._task_instance.job_id, self._task_instance.task_id)
   
   proc_title = "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date}"
   if hasattr(args, "job_id"):
       proc_title += " {0.job_id}"
   setproctitle(proc_title.format(args))
   
   try:
       args.func(args, dag=self.dag)
       return_code = 0
   except Exception:  # pylint: disable=broad-except
       return_code = 1
   finally:
       # Explicitly flush any pending exception to Sentry if enabled
       Sentry.flush()
       logging.shutdown()
       os._exit(return_code)  # pylint: disable=protected-access
   ``` 
   Lines with try-except block are very interesting. **We can see that all kind of exceptions are excepted and nothing is written to logs.** I think that is not a good practice but I didn't fall into despair and added such kind of line in source code before return_code = 1:
   ```
   self.log.error(e)
   ```
   and triggered dag from UI. Several seconds after failing dag I saw such kind of logs in UI:
   ```
   >2021-03-26 15:15:29,991|ERROR|standard_task_runner.py:88|You cannot use the --pickle option when using DAG.cli() method.
   ```
   Hmmm... What a strange exception... And after some seeking I see that line of code:
   ```
   if dag and args.pickle:
       raise AirflowException("You cannot use the --pickle option when using DAG.cli() method.")
   ```
   
   What does it mean? And this means that there is **logic bug** because enriching task_instance with pickle_id leads to this exception and this happens in that code lines of method **airflow.task.task_runner.standard_task_runner.StandardTaskRunner._start_by_fork**:
   ```
   try:
       args.func(args, dag=self.dag)
       return_code = 0
   ```
   exactly there --pickle flag and instance of dag is used together. I'm all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org