You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/30 14:33:34 UTC

[GitHub] [airflow] huf-92 opened a new issue #17343: Dags added by DagBag interrupt randomly

huf-92 opened a new issue #17343:
URL: https://github.com/apache/airflow/issues/17343


   
   **Apache Airflow version**: 2.1.2
   
   **Kubernetes version**: not used
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**:
   - **OS**: CentOS Stream 8
   - **Kernel**: Linux 4.18.0-315.el8.x86_64
   - **Install tools**:
   - **Others**:
   
   **What happened**:
   
   I added this script
   
   	import logging
   	from airflow.models import DagBag
   
   	# folders with dag definitions to add
   	dags_dirs = ["random/path"]
   	for dag_dir in dags_dirs:
   		# add each DAG folder as DagBag
   		dag_bag = DagBag(dag_dir)
   		if dag_bag:
   			for dag_id, dag in dag_bag.dags.items():
   				globals()[dag_id] = dag
   		else:
   			logging.error("dag_bag error: %s", str(dag_bag))
   
   to my Airflow Dags folder to add more Dags in other locations. Above code adds
   successfully all my Dags in random/path in the way i can trigger them. Some
   of the Dags contain a task with
   
   	trigger = TriggerDagRunOperator(
   		task_id="trigger",
   		trigger_dag_id=dag_name,
   		conf={"group_nr": next_group_nr, "group": next_group},
   	)
   	trigger.execute({})
   
   which triggers the Dag itself just with other configuration. The process of
   self-triggering should stop at the moment some conditions are fulfilled.
   Unfortunately this process stop randomly with the error message
   
   	*** Reading local file: /opt/airflow/logs/random_dag/generate_new_dag/2021-07-29T11:00:41.783999+00:00/1.log
   	[2021-07-29 13:04:35,678] {standard_task_runner.py:52} INFO - Started process 2649819 to run task
   	[2021-07-29 13:04:35,684] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'random_dag', 'generate_new_dag', '2021-07-29T11:00:41.783999+00:00', '--job-id', '25286', '--pool', 'default_pool', '--raw', '--subdir', '/opt/airflow-dags/random_folder/dags/random_dag.py', '--cfg-path', '/tmp/tmpx_0j_xtr', '--error-file', '/tmp/tmpnpudbc15']
   	[2021-07-29 13:04:35,685] {standard_task_runner.py:77} INFO - Job 25286: Subtask generate_new_dag
   	[2021-07-29 13:04:35,964] {dag_generator.py:69} ERROR - DAG 'random_dag' not found in serialized_dag table
   	[2021-07-29 13:04:36,013] {local_task_job.py:149} INFO - Task exited with return code 1
   
   My issue could be related to [#13504](https://github.com/apache/airflow/issues/13504)
   with the difference that in #13504 the Dags were not added by a DagBag.
   
   **What you expected to happen**:
   
   With keeping all my Dags in the Airflow Dags folder, everything works fine
   without a random interruption. This should be the same with the Dags added
   by a DagBag.
   
   <!-- What do you think went wrong? -->
   
   I guess the self-triggering process is interrupted randomly because my Dags
   are triggered at the same time the scheduler reloads the Dags in random\path.
   Therefore the TriggerDagRunOperator should wait until all Dags are reloaded by
   the scheduler.
   
   **How to reproduce it**:
   
   Add a Dag by a DagBag which permanently triggers itself and wait until it is
   interrupted.
   
   **Anything else we need to know**:
   
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on issue #17343: Dags added by DagBag interrupt randomly

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #17343:
URL: https://github.com/apache/airflow/issues/17343#issuecomment-926249675


   This issue has been closed because it has not received response from the issue author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on issue #17343: Dags added by DagBag interrupt randomly

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #17343:
URL: https://github.com/apache/airflow/issues/17343#issuecomment-921344500


   This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] huf-92 commented on issue #17343: Dags added by DagBag interrupt randomly

Posted by GitBox <gi...@apache.org>.
huf-92 commented on issue #17343:
URL: https://github.com/apache/airflow/issues/17343#issuecomment-900149194


   Thank you for your fast reply.
   Removing <code>trigger.execute({})</code> causes that the self-triggering process is not starting. Perhaps i described my self-triggering process in a bad way. Below you see the TriggerDagRunOperator is wrapped by a function called by a PythonOperator
   and therefore the trigger.execute({}) is needed. Or is there a better way to trigger the same DAG in function called by an operator ?
   ```python
   trigger_dag = PythonOperator(
      task_id="trigger_dag",
      python_callable=_trigger_dag,
   )
   trigger_dag.doc_md = """triggers a new dag"""
   
   def _trigger_dag():
     if random_condition:
        trigger = TriggerDagRunOperator(
   	task_id="trigger",
   	trigger_dag_id=dag_name,
   	conf={"group_nr": next_group_nr, "group": next_group},
       )
       trigger.execute({})
     else:
        return
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] huf-92 edited a comment on issue #17343: Dags added by DagBag interrupt randomly

Posted by GitBox <gi...@apache.org>.
huf-92 edited a comment on issue #17343:
URL: https://github.com/apache/airflow/issues/17343#issuecomment-900149194


   @jedcunningham Thank you for your reply.
   Removing <code>trigger.execute({})</code> causes that the self-triggering process is not starting. Perhaps i described my self-triggering process in a bad way. Below you see the TriggerDagRunOperator is wrapped by a function called by a PythonOperator
   and therefore the trigger.execute({}) is needed. Or is there a better way to trigger the same DAG in function called by an operator ?
   ```python
   trigger_dag = PythonOperator(
      task_id="trigger_dag",
      python_callable=_trigger_dag,
   )
   trigger_dag.doc_md = """triggers a new dag"""
   
   def _trigger_dag():
     if random_condition:
        trigger = TriggerDagRunOperator(
   	task_id="trigger",
   	trigger_dag_id=dag_name,
   	conf={"group_nr": next_group_nr, "group": next_group},
       )
       trigger.execute({})
     else:
        return
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on issue #17343: Dags added by DagBag interrupt randomly

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on issue #17343:
URL: https://github.com/apache/airflow/issues/17343#issuecomment-900681488


   If it were me, I'd use `TriggerDagRunOperator` directly and put a `ShortCircuirOperator` in front of it to check your `random_condition`, something like:
   
   ```
   check = ShortCircuitOperator(
       task_id="check",
       python_callable=lamda: random_condition,
   )
   
   trigger = TriggerDagRunOperator(
       task_id="trigger",
       trigger_dag_id=dag_name,
       conf={"group_nr": next_group_nr, "group": next_group},
   )
   
   check >> trigger
   ```
   
   Generally, I'd say using `.execute()` on an operator directly is an anti-pattern.
   
   Also, as mentioned in #17344 Airflow really does expect a single DAG folder. Finding a simpler way to do multi-tenancy would probably be a good idea.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on issue #17343: Dags added by DagBag interrupt randomly

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on issue #17343:
URL: https://github.com/apache/airflow/issues/17343#issuecomment-890215689


   Try removing the `trigger.execute({})` line. This is causing the scheduler to trigger the DAG every time the DAG is parsed (which can be roughly every 30s), probably not what you intended.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #17343: Dags added by DagBag interrupt randomly

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #17343:
URL: https://github.com/apache/airflow/issues/17343#issuecomment-889934056


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on issue #17343: Dags added by DagBag interrupt randomly

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on issue #17343:
URL: https://github.com/apache/airflow/issues/17343#issuecomment-900681488


   If it were me, I'd use `TriggerDagRunOperator` directly and put a `ShortCircuirOperator` in front of it to check your `random_condition`, something like:
   
   ```
   check = ShortCircuitOperator(
       task_id="check",
       python_callable=lamda: random_condition,
   )
   
   trigger = TriggerDagRunOperator(
       task_id="trigger",
       trigger_dag_id=dag_name,
       conf={"group_nr": next_group_nr, "group": next_group},
   )
   
   check >> trigger
   ```
   
   Generally, I'd say using `.execute()` on an operator directly is an anti-pattern.
   
   Also, as mentioned in #17344 Airflow really does expect a single DAG folder. Finding a simpler way to do multi-tenancy would probably be a good idea.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed issue #17343: Dags added by DagBag interrupt randomly

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed issue #17343:
URL: https://github.com/apache/airflow/issues/17343


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org