You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/30 14:33:34 UTC
[GitHub] [airflow] huf-92 opened a new issue #17343: Dags added by DagBag interrupt randomly
huf-92 opened a new issue #17343:
URL: https://github.com/apache/airflow/issues/17343
**Apache Airflow version**: 2.1.2
**Kubernetes version**: not used
**Environment**:
- **Cloud provider or hardware configuration**:
- **OS**: CentOS Stream 8
- **Kernel**: Linux 4.18.0-315.el8.x86_64
- **Install tools**:
- **Others**:
**What happened**:
I added this script
import logging
from airflow.models import DagBag
# folders with dag definitions to add
dags_dirs = ["random/path"]
for dag_dir in dags_dirs:
# add each DAG folder as DagBag
dag_bag = DagBag(dag_dir)
if dag_bag:
for dag_id, dag in dag_bag.dags.items():
globals()[dag_id] = dag
else:
logging.error("dag_bag error: %s", str(dag_bag))
to my Airflow Dags folder to add more Dags in other locations. Above code adds
successfully all my Dags in random/path in the way i can trigger them. Some
of the Dags contain a task with
trigger = TriggerDagRunOperator(
task_id="trigger",
trigger_dag_id=dag_name,
conf={"group_nr": next_group_nr, "group": next_group},
)
trigger.execute({})
which triggers the Dag itself just with other configuration. The process of
self-triggering should stop at the moment some conditions are fulfilled.
Unfortunately this process stop randomly with the error message
*** Reading local file: /opt/airflow/logs/random_dag/generate_new_dag/2021-07-29T11:00:41.783999+00:00/1.log
[2021-07-29 13:04:35,678] {standard_task_runner.py:52} INFO - Started process 2649819 to run task
[2021-07-29 13:04:35,684] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'random_dag', 'generate_new_dag', '2021-07-29T11:00:41.783999+00:00', '--job-id', '25286', '--pool', 'default_pool', '--raw', '--subdir', '/opt/airflow-dags/random_folder/dags/random_dag.py', '--cfg-path', '/tmp/tmpx_0j_xtr', '--error-file', '/tmp/tmpnpudbc15']
[2021-07-29 13:04:35,685] {standard_task_runner.py:77} INFO - Job 25286: Subtask generate_new_dag
[2021-07-29 13:04:35,964] {dag_generator.py:69} ERROR - DAG 'random_dag' not found in serialized_dag table
[2021-07-29 13:04:36,013] {local_task_job.py:149} INFO - Task exited with return code 1
My issue could be related to [#13504](https://github.com/apache/airflow/issues/13504)
with the difference that in #13504 the Dags were not added by a DagBag.
**What you expected to happen**:
With keeping all my Dags in the Airflow Dags folder, everything works fine
without a random interruption. This should be the same with the Dags added
by a DagBag.
<!-- What do you think went wrong? -->
I guess the self-triggering process is interrupted randomly because my Dags
are triggered at the same time the scheduler reloads the Dags in random\path.
Therefore the TriggerDagRunOperator should wait until all Dags are reloaded by
the scheduler.
**How to reproduce it**:
Add a Dag by a DagBag which permanently triggers itself and wait until it is
interrupted.
**Anything else we need to know**:
No
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on issue #17343: Dags added by DagBag interrupt randomly
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #17343:
URL: https://github.com/apache/airflow/issues/17343#issuecomment-926249675
This issue has been closed because it has not received response from the issue author.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on issue #17343: Dags added by DagBag interrupt randomly
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #17343:
URL: https://github.com/apache/airflow/issues/17343#issuecomment-921344500
This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] huf-92 commented on issue #17343: Dags added by DagBag interrupt randomly
Posted by GitBox <gi...@apache.org>.
huf-92 commented on issue #17343:
URL: https://github.com/apache/airflow/issues/17343#issuecomment-900149194
Thank you for your fast reply.
Removing <code>trigger.execute({})</code> causes that the self-triggering process is not starting. Perhaps i described my self-triggering process in a bad way. Below you see the TriggerDagRunOperator is wrapped by a function called by a PythonOperator
and therefore the trigger.execute({}) is needed. Or is there a better way to trigger the same DAG in function called by an operator ?
```python
trigger_dag = PythonOperator(
task_id="trigger_dag",
python_callable=_trigger_dag,
)
trigger_dag.doc_md = """triggers a new dag"""
def _trigger_dag():
if random_condition:
trigger = TriggerDagRunOperator(
task_id="trigger",
trigger_dag_id=dag_name,
conf={"group_nr": next_group_nr, "group": next_group},
)
trigger.execute({})
else:
return
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] huf-92 edited a comment on issue #17343: Dags added by DagBag interrupt randomly
Posted by GitBox <gi...@apache.org>.
huf-92 edited a comment on issue #17343:
URL: https://github.com/apache/airflow/issues/17343#issuecomment-900149194
@jedcunningham Thank you for your reply.
Removing <code>trigger.execute({})</code> causes that the self-triggering process is not starting. Perhaps i described my self-triggering process in a bad way. Below you see the TriggerDagRunOperator is wrapped by a function called by a PythonOperator
and therefore the trigger.execute({}) is needed. Or is there a better way to trigger the same DAG in function called by an operator ?
```python
trigger_dag = PythonOperator(
task_id="trigger_dag",
python_callable=_trigger_dag,
)
trigger_dag.doc_md = """triggers a new dag"""
def _trigger_dag():
if random_condition:
trigger = TriggerDagRunOperator(
task_id="trigger",
trigger_dag_id=dag_name,
conf={"group_nr": next_group_nr, "group": next_group},
)
trigger.execute({})
else:
return
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] jedcunningham commented on issue #17343: Dags added by DagBag interrupt randomly
Posted by GitBox <gi...@apache.org>.
jedcunningham commented on issue #17343:
URL: https://github.com/apache/airflow/issues/17343#issuecomment-900681488
If it were me, I'd use `TriggerDagRunOperator` directly and put a `ShortCircuirOperator` in front of it to check your `random_condition`, something like:
```
check = ShortCircuitOperator(
task_id="check",
python_callable=lamda: random_condition,
)
trigger = TriggerDagRunOperator(
task_id="trigger",
trigger_dag_id=dag_name,
conf={"group_nr": next_group_nr, "group": next_group},
)
check >> trigger
```
Generally, I'd say using `.execute()` on an operator directly is an anti-pattern.
Also, as mentioned in #17344 Airflow really does expect a single DAG folder. Finding a simpler way to do multi-tenancy would probably be a good idea.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] jedcunningham commented on issue #17343: Dags added by DagBag interrupt randomly
Posted by GitBox <gi...@apache.org>.
jedcunningham commented on issue #17343:
URL: https://github.com/apache/airflow/issues/17343#issuecomment-890215689
Try removing the `trigger.execute({})` line. This is causing the scheduler to trigger the DAG every time the DAG is parsed (which can be roughly every 30s), probably not what you intended.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #17343: Dags added by DagBag interrupt randomly
Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #17343:
URL: https://github.com/apache/airflow/issues/17343#issuecomment-889934056
Thanks for opening your first issue here! Be sure to follow the issue template!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] jedcunningham commented on issue #17343: Dags added by DagBag interrupt randomly
Posted by GitBox <gi...@apache.org>.
jedcunningham commented on issue #17343:
URL: https://github.com/apache/airflow/issues/17343#issuecomment-900681488
If it were me, I'd use `TriggerDagRunOperator` directly and put a `ShortCircuirOperator` in front of it to check your `random_condition`, something like:
```
check = ShortCircuitOperator(
task_id="check",
python_callable=lamda: random_condition,
)
trigger = TriggerDagRunOperator(
task_id="trigger",
trigger_dag_id=dag_name,
conf={"group_nr": next_group_nr, "group": next_group},
)
check >> trigger
```
Generally, I'd say using `.execute()` on an operator directly is an anti-pattern.
Also, as mentioned in #17344 Airflow really does expect a single DAG folder. Finding a simpler way to do multi-tenancy would probably be a good idea.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] closed issue #17343: Dags added by DagBag interrupt randomly
Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed issue #17343:
URL: https://github.com/apache/airflow/issues/17343
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org