You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/08 05:00:39 UTC

[GitHub] [airflow] MatrixManAtYrService opened a new issue, #25586: Something about this dataset digraph prevents datasets from triggering dags

MatrixManAtYrService opened a new issue, #25586:
URL: https://github.com/apache/airflow/issues/25586

   ### Apache Airflow version
   
   main (development)
   
   ### What happened
   
   First, an apology.  I tried to replicate this with several simpler sets of dags, but they worked fine.  I don't know what it is about this particular arrangement that causes the problem.
   
   Here's a directed graph: 
   ![image](https://user-images.githubusercontent.com/5834582/183339466-67668ded-0628-4e4a-8958-d48975c6b764.png)
   
   If we interpret each node as a DAG, and each outbound edge as a task in that dag such that each task specifies a unique dataset as its outlet which is referenced by the targeted node/DAG as an inlet, then we end up with this file:
   
   ```python3
   from airflow import Dataset, DAG
   from airflow.operators.python import PythonOperator
   from datetime import datetime, timedelta
   from math import ceil
   import pause
   
   def wait_until_twenty_sec():
       now = datetime.now()
       extra = 20 - (ceil(now.second / 10) * 10)
       go_time = now + timedelta(seconds=extra)
       print(f"waiting until {go_time}")
       pause.until(go_time)
   
   a5_a14 = Dataset("a5_a14")
   a5_a12 = Dataset("a5_a12")
   a15_a14 = Dataset("a15_a14")
   a7_a6 = Dataset("a7_a6")
   a7_a8 = Dataset("a7_a8")
   a6_a2 = Dataset("a6_a2")
   a5_a10 = Dataset("a5_a10")
   a11_a1 = Dataset("a11_a1")
   a1_a12 = Dataset("a1_a12")
   a1_a2 = Dataset("a1_a2")
   a13_a5 = Dataset("a13_a5")
   a15_a9 = Dataset("a15_a9")
   a10_a4 = Dataset("a10_a4")
   a10_a11 = Dataset("a10_a11")
   a4_a15 = Dataset("a4_a15")
   a5_a6 = Dataset("a5_a6")
   a2_a7 = Dataset("a2_a7")
   a11_a8 = Dataset("a11_a8")
   a13_a9 = Dataset("a13_a9")
   a1_a3 = Dataset("a1_a3")
   a4_a14 = Dataset("a4_a14")
   a6_a15 = Dataset("a6_a15")
   a11_a2 = Dataset("a11_a2")
   a14_a8 = Dataset("a14_a8")
   a15_a12 = Dataset("a15_a12")
   a7_a14 = Dataset("a7_a14")
   a6_a4 = Dataset("a6_a4")
   a14_a12 = Dataset("a14_a12")
   a9_a5 = Dataset("a9_a5")
   a14_a9 = Dataset("a14_a9")
   a6_a1 = Dataset("a6_a1")
   a15_a1 = Dataset("a15_a1")
   
   with DAG(dag_id="a1",
            start_date=datetime(1970, 1, 1),schedule_on=[a6_a1, a11_a1, a15_a1],
   ) as a1:
   
       PythonOperator(task_id='a1_a2', python_callable=wait_until_twenty_sec, outlets=[a1_a2])
       PythonOperator(task_id='a1_a3', python_callable=wait_until_twenty_sec, outlets=[a1_a3])
       PythonOperator(task_id='a1_a12', python_callable=wait_until_twenty_sec, outlets=[a1_a12])
   
   with DAG(dag_id="a2",
            start_date=datetime(1970, 1, 1),schedule_on=[a1_a2, a6_a2, a11_a2],
   ) as a2:
   
       PythonOperator(task_id='a2_a7', python_callable=wait_until_twenty_sec, outlets=[a2_a7])
   
   with DAG(dag_id="a3",
            start_date=datetime(1970, 1, 1),schedule_on=[a1_a3],
   ) as a3:
   
       PythonOperator(task_id='no_outlets', python_callable=wait_until_twenty_sec)
   
   with DAG(dag_id="a6",
            start_date=datetime(1970, 1, 1),schedule_on=[a5_a6, a7_a6],
   ) as a6:
   
       PythonOperator(task_id='a6_a1', python_callable=wait_until_twenty_sec, outlets=[a6_a1])
       PythonOperator(task_id='a6_a2', python_callable=wait_until_twenty_sec, outlets=[a6_a2])
       PythonOperator(task_id='a6_a4', python_callable=wait_until_twenty_sec, outlets=[a6_a4])
       PythonOperator(task_id='a6_a15', python_callable=wait_until_twenty_sec, outlets=[a6_a15])
   
   with DAG(dag_id="a11",
            start_date=datetime(1970, 1, 1),schedule_on=[a10_a11],
   ) as a11:
   
       PythonOperator(task_id='a11_a1', python_callable=wait_until_twenty_sec, outlets=[a11_a1])
       PythonOperator(task_id='a11_a2', python_callable=wait_until_twenty_sec, outlets=[a11_a2])
       PythonOperator(task_id='a11_a8', python_callable=wait_until_twenty_sec, outlets=[a11_a8])
   
   with DAG(dag_id="a12",
            start_date=datetime(1970, 1, 1),schedule_on=[a1_a12, a5_a12, a14_a12, a15_a12],
   ) as a12:
   
       PythonOperator(task_id='no_outlets', python_callable=wait_until_twenty_sec)
   
   with DAG(dag_id="a15",
            start_date=datetime(1970, 1, 1),schedule_on=[a4_a15, a6_a15],
   ) as a15:
   
       PythonOperator(task_id='a15_a1', python_callable=wait_until_twenty_sec, outlets=[a15_a1])
       PythonOperator(task_id='a15_a9', python_callable=wait_until_twenty_sec, outlets=[a15_a9])
       PythonOperator(task_id='a15_a12', python_callable=wait_until_twenty_sec, outlets=[a15_a12])
       PythonOperator(task_id='a15_a14', python_callable=wait_until_twenty_sec, outlets=[a15_a14])
   
   with DAG(dag_id="a7",
            start_date=datetime(1970, 1, 1),schedule_on=[a2_a7],
   ) as a7:
   
       PythonOperator(task_id='a7_a6', python_callable=wait_until_twenty_sec, outlets=[a7_a6])
       PythonOperator(task_id='a7_a8', python_callable=wait_until_twenty_sec, outlets=[a7_a8])
       PythonOperator(task_id='a7_a14', python_callable=wait_until_twenty_sec, outlets=[a7_a14])
   
   with DAG(dag_id="a4",
            start_date=datetime(1970, 1, 1),schedule_on=[a6_a4, a10_a4],
   ) as a4:
   
       PythonOperator(task_id='a4_a14', python_callable=wait_until_twenty_sec, outlets=[a4_a14])
       PythonOperator(task_id='a4_a15', python_callable=wait_until_twenty_sec, outlets=[a4_a15])
   
   with DAG(dag_id="a10",
            start_date=datetime(1970, 1, 1),schedule_on=[a5_a10],
   ) as a10:
   
       PythonOperator(task_id='a10_a4', python_callable=wait_until_twenty_sec, outlets=[a10_a4])
       PythonOperator(task_id='a10_a11', python_callable=wait_until_twenty_sec, outlets=[a10_a11])
   
   with DAG(dag_id="a14",
            start_date=datetime(1970, 1, 1),schedule_on=[a4_a14, a5_a14, a7_a14, a15_a14],
   ) as a14:
   
       PythonOperator(task_id='a14_a8', python_callable=wait_until_twenty_sec, outlets=[a14_a8])
       PythonOperator(task_id='a14_a9', python_callable=wait_until_twenty_sec, outlets=[a14_a9])
       PythonOperator(task_id='a14_a12', python_callable=wait_until_twenty_sec, outlets=[a14_a12])
   
   with DAG(dag_id="a5",
            start_date=datetime(1970, 1, 1),schedule_on=[a9_a5, a13_a5],
   ) as a5:
   
       PythonOperator(task_id='a5_a6', python_callable=wait_until_twenty_sec, outlets=[a5_a6])
       PythonOperator(task_id='a5_a10', python_callable=wait_until_twenty_sec, outlets=[a5_a10])
       PythonOperator(task_id='a5_a12', python_callable=wait_until_twenty_sec, outlets=[a5_a12])
       PythonOperator(task_id='a5_a14', python_callable=wait_until_twenty_sec, outlets=[a5_a14])
   
   with DAG(dag_id="a9",
            start_date=datetime(1970, 1, 1),schedule_on=[a13_a9, a14_a9, a15_a9],
   ) as a9:
   
       PythonOperator(task_id='a9_a5', python_callable=wait_until_twenty_sec, outlets=[a9_a5])
   
   with DAG(dag_id="start_a13",
            start_date=datetime(1970, 1, 1),schedule_interval=None,
   ) as start_a13:
   
       PythonOperator(task_id='a13_a5', python_callable=wait_until_twenty_sec, outlets=[a13_a5])
       PythonOperator(task_id='a13_a9', python_callable=wait_until_twenty_sec, outlets=[a13_a9])
   
   with DAG(dag_id="a8",
            start_date=datetime(1970, 1, 1),schedule_on=[a7_a8, a11_a8, a14_a8],
   ) as a8:
   
       PythonOperator(task_id='no_outlets', python_callable=wait_until_twenty_sec)
   
   ```
   
   Note that the DAG called `start_a13` has two tasks which outlet to datasets:
   - `a13_a5`
   - `a13_a9`
   
   Subsequently, DAGs `a5` and `a9` specify those datasets as inlets.  So I would expect that if I run `start_a13` and wait around for a bit, `a5` and `a9` should run.  But that doesn't happen.  Instead, `start_a13` runs and completes successfully, but the other dags don't run.
   
   
   The scheduler logs don't show any errors, they just kind of stop: 
   ```
   
   
   venv ❯ airflow scheduler
     ____________       _____________
    ____    |__( )_________  __/__  /________      __
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   {scheduler_job.py:692} INFO - Starting the scheduler
   {scheduler_job.py:697} INFO - Processing each file at most -1 times
   [INFO] Starting gunicorn 20.1.0
   [INFO] Listening at: http://[::]:8793 (71751)
   [INFO] Using worker: sync
   {executor_loader.py:105} INFO - Loaded executor: LocalExecutor
   [INFO] Booting worker with pid: 71752
   [INFO] Booting worker with pid: 71765
   {manager.py:160} INFO - Launched DagFileProcessorManager with pid: 71889
   {scheduler_job.py:1233} INFO - Resetting orphaned tasks for active dag runs
   {settings.py:55} INFO - Configured default timezone Timezone('UTC')
   {scheduler_job.py:341} INFO - 2 tasks up for execution:
   	<TaskInstance: start_a13.a13_a5 manual__2022-08-08T04:44:13.755620+00:00 [scheduled]>
   	<TaskInstance: start_a13.a13_a9 manual__2022-08-08T04:44:13.755620+00:00 [scheduled]>
   {scheduler_job.py:406} INFO - DAG start_a13 has 0/16 running and queued tasks
   {scheduler_job.py:406} INFO - DAG start_a13 has 1/16 running and queued tasks
   {scheduler_job.py:492} INFO - Setting the following tasks to queued state:
   	<TaskInstance: start_a13.a13_a5 manual__2022-08-08T04:44:13.755620+00:00 [scheduled]>
   	<TaskInstance: start_a13.a13_a9 manual__2022-08-08T04:44:13.755620+00:00 [scheduled]>
   {scheduler_job.py:531} INFO - Sending TaskInstanceKey(dag_id='start_a13', task_id='a13_a5', run_id='manual__2022-08-08T04:44:13.755620+00:00', try_number=1, map_index=-1) to executor with priority 1 and queue default
   {base_executor.py:92} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'start_a13', 'a13_a5', 'manual__2022-08-08T04:44:13.755620+00:00', '--local', '--subdir', 'DAGS_FOLDER/dataset_network_A.py']
   {scheduler_job.py:531} INFO - Sending TaskInstanceKey(dag_id='start_a13', task_id='a13_a9', run_id='manual__2022-08-08T04:44:13.755620+00:00', try_number=1, map_index=-1) to executor with priority 1 and queue default
   {base_executor.py:92} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'start_a13', 'a13_a9', 'manual__2022-08-08T04:44:13.755620+00:00', '--local', '--subdir', 'DAGS_FOLDER/dataset_network_A.py']
   {local_executor.py:79} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'start_a13', 'a13_a5', 'manual__2022-08-08T04:44:13.755620+00:00', '--local', '--subdir', 'DAGS_FOLDER/dataset_network_A.py']
   {local_executor.py:79} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'start_a13', 'a13_a9', 'manual__2022-08-08T04:44:13.755620+00:00', '--local', '--subdir', 'DAGS_FOLDER/dataset_network_A.py']
   {task_command.py:378} INFO - Running <TaskInstance: start_a13.a13_a5 manual__2022-08-08T04:44:13.755620+00:00 [queued]> on host ChoedanKal
   {task_command.py:378} INFO - Running <TaskInstance: start_a13.a13_a9 manual__2022-08-08T04:44:13.755620+00:00 [queued]> on host ChoedanKal
   {scheduler_job.py:583} INFO - Executor reports execution of start_a13.a13_a5 run_id=manual__2022-08-08T04:44:13.755620+00:00 exited with status success for try_number 1
   {scheduler_job.py:626} INFO - TaskInstance Finished: dag_id=start_a13, task_id=a13_a5, run_id=manual__2022-08-08T04:44:13.755620+00:00, map_index=-1, run_start_date=2022-08-08 04:44:15.281603+00:00, run_end_date=2022-08-08 04:44:16.121190+00:00, run_duration=0.839587, state=success, executor_state=success, try_number=1, max_tries=0, job_id=17, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2022-08-08 04:44:14.164815+00:00, queued_by_job_id=15, pid=71985
   {dagrun.py:567} INFO - Marking run <DagRun start_a13 @ 2022-08-08 04:44:13.755620+00:00: manual__2022-08-08T04:44:13.755620+00:00, state:running, queued_at: 2022-08-08 04:44:13.817611+00:00. externally triggered: True> successful
   {dagrun.py:612} INFO - DagRun Finished: dag_id=start_a13, execution_date=2022-08-08 04:44:13.755620+00:00, run_id=manual__2022-08-08T04:44:13.755620+00:00, run_start_date=2022-08-08 04:44:13.960460+00:00, run_end_date=2022-08-08 04:44:17.579620+00:00, run_duration=3.61916, state=success, external_trigger=True, run_type=manual, data_interval_start=2022-08-08 04:44:13.755620+00:00, data_interval_end=2022-08-08 04:44:13.755620+00:00, dag_hash=254f7fa18d109067953eb15cf420949c
   {dag.py:3178} INFO - Setting next_dagrun for start_a13 to None, run_after=None
   {scheduler_job.py:583} INFO - Executor reports execution of start_a13.a13_a9 run_id=manual__2022-08-08T04:44:13.755620+00:00 exited with status success for try_number 1
   {scheduler_job.py:626} INFO - TaskInstance Finished: dag_id=start_a13, task_id=a13_a9, run_id=manual__2022-08-08T04:44:13.755620+00:00, map_index=-1, run_start_date=2022-08-08 04:44:15.470887+00:00, run_end_date=2022-08-08 04:44:16.354373+00:00, run_duration=0.883486, state=success, executor_state=success, try_number=1, max_tries=0, job_id=16, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2022-08-08 04:44:14.164815+00:00, queued_by_job_id=15, pid=71988
   [INFO] Handling signal: winch
   
   ```
   
   ### What you think should happen instead
   
   The dags `a5` and `a9` should run.
   
   ### How to reproduce
   
   Run the dag above
   
   ### Operating System
   
   NixOS (linux)
   
   ### Versions of Apache Airflow Providers
   
   n/a
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   I'm using the LocalExecutor (not sure if that's relevant).
   
   Also, here's a dump of my database.  Based on the dag dependency view, it looks like Airflow is aware of the dependency.  And the tasks with the outlets were successful, so I'm not sure why the manually triggered dag was the only one that ran.
   
   [dump.sql.gz](https://github.com/apache/airflow/files/9278812/dump.sql.gz)
   
   
   ### Anything else
   
   In case you're interested in generating other messy dags, here's a gist with the generator script: https://gist.github.com/MatrixManAtYrService/52f25558499189848f75dc1ece0c42a2
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MatrixManAtYrService commented on issue #25586: Something about this dataset digraph prevents datasets from triggering dags

Posted by GitBox <gi...@apache.org>.
MatrixManAtYrService commented on issue #25586:
URL: https://github.com/apache/airflow/issues/25586#issuecomment-1208938356

   I can't be 100% sure it's the same thing, but I'm seeing similar behavior: scheduler just kind of stops without explanation with this (much simpler) dag:
   
   ```python3
   from airflow import Dataset, DAG
   from airflow.operators.python import PythonOperator
   from datetime import datetime, timedelta
   from math import ceil
   import pause
   
   def wait_until_twenty_sec():
       now = datetime.now()
       minutes = 0
       if now.second < 15:
           seconds = 20
       elif now.second < 35:
           seconds = 40
       elif now.second < 55:
           seconds = 0
           minutes = +1
       else:
           seconds = 20
           minutes += 1
       go_time = now + timedelta(minutes=minutes)
       go_time = datetime(
           year=go_time.year,
           month=go_time.month,
           day=go_time.day,
           hour=go_time.hour,
           minute=go_time.minute,
           second=seconds,
       )
       print(f"waiting until {go_time}")
       pause.until(go_time)
   
   a3_a2 = Dataset("a3_a2")
   a1_a2 = Dataset("a1_a2")
   a1_a3 = Dataset("a1_a3")
   
   with DAG(dag_id="start_a1",
            start_date=datetime(1970, 1, 1),schedule_interval=None,
   ) as start_a1:
   
       PythonOperator(task_id='a1_a2', python_callable=wait_until_twenty_sec, outlets=[a1_a2])
       PythonOperator(task_id='a1_a3', python_callable=wait_until_twenty_sec, outlets=[a1_a3])
   
   with DAG(dag_id="a2",
            start_date=datetime(1970, 1, 1),schedule_on=[a1_a2, a3_a2],
   ) as a2:
   
       PythonOperator(task_id='no_outlets', python_callable=wait_until_twenty_sec)
   
   with DAG(dag_id="a3",
            start_date=datetime(1970, 1, 1),schedule_on=[a1_a3],
   ) as a3:
   
       PythonOperator(task_id='a3_a2', python_callable=wait_until_twenty_sec, outlets=[a3_a2])
   ```
   
   In this case it gets stuck with one task queued:
   
   ![stuck](https://user-images.githubusercontent.com/5834582/183572594-c8916b68-f7e5-4208-baa4-199283df86e2.png)
   
   Again, scheduler logs are uninteresting:
   ```
   venv ❯ airflow scheduler
     ____________       _____________
    ____    |__( )_________  __/__  /________      __
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   {scheduler_job.py:692} INFO - Starting the scheduler
   {scheduler_job.py:697} INFO - Processing each file at most -1 times
   [51831] [INFO] Starting gunicorn 20.1.0
   [51831] [INFO] Listening at: http://[::]:8793 (51831)
   [51831] [INFO] Using worker: sync
   {executor_loader.py:105} INFO - Loaded executor: LocalExecutor
   [51833] [INFO] Booting worker with pid: 51833
   [51843] [INFO] Booting worker with pid: 51843
   {manager.py:160} INFO - Launched DagFileProcessorManager with pid: 51969
   {scheduler_job.py:1233} INFO - Resetting orphaned tasks for active dag runs
   {settings.py:55} INFO - Configured default timezone Timezone('UTC')
   {scheduler_job.py:341} INFO - 2 tasks up for execution:
   	<TaskInstance: start_a1.a1_a2 manual__2022-08-09T05:31:45.907115+00:00 [scheduled]>
   	<TaskInstance: start_a1.a1_a3 manual__2022-08-09T05:31:45.907115+00:00 [scheduled]>
   {scheduler_job.py:406} INFO - DAG start_a1 has 0/16 running and queued tasks
   {scheduler_job.py:406} INFO - DAG start_a1 has 1/16 running and queued tasks
   {scheduler_job.py:492} INFO - Setting the following tasks to queued state:
   	<TaskInstance: start_a1.a1_a2 manual__2022-08-09T05:31:45.907115+00:00 [scheduled]>
   	<TaskInstance: start_a1.a1_a3 manual__2022-08-09T05:31:45.907115+00:00 [scheduled]>
   {scheduler_job.py:531} INFO - Sending TaskInstanceKey(dag_id='start_a1', task_id='a1_a2', run_id='manual__2022-08-09T05:31:45.907115+00:00', try_number=1, map_index=-1) to executor with priority 1 and queue default
   {base_executor.py:92} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'start_a1', 'a1_a2', 'manual__2022-08-09T05:31:45.907115+00:00', '--local', '--subdir', 'DAGS_FOLDER/dataset_network_A.py']
   {scheduler_job.py:531} INFO - Sending TaskInstanceKey(dag_id='start_a1', task_id='a1_a3', run_id='manual__2022-08-09T05:31:45.907115+00:00', try_number=1, map_index=-1) to executor with priority 1 and queue default
   {base_executor.py:92} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'start_a1', 'a1_a3', 'manual__2022-08-09T05:31:45.907115+00:00', '--local', '--subdir', 'DAGS_FOLDER/dataset_network_A.py']
   {local_executor.py:79} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'start_a1', 'a1_a2', 'manual__2022-08-09T05:31:45.907115+00:00', '--local', '--subdir', 'DAGS_FOLDER/dataset_network_A.py']
   {local_executor.py:79} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'start_a1', 'a1_a3', 'manual__2022-08-09T05:31:45.907115+00:00', '--local', '--subdir', 'DAGS_FOLDER/dataset_network_A.py']
   {task_command.py:378} INFO - Running <TaskInstance: start_a1.a1_a2 manual__2022-08-09T05:31:45.907115+00:00 [queued]> on host ChoedanKal
   {task_command.py:378} INFO - Running <TaskInstance: start_a1.a1_a3 manual__2022-08-09T05:31:45.907115+00:00 [queued]> on host ChoedanKal
   {dagrun.py:567} INFO - Marking run <DagRun start_a1 @ 2022-08-09 05:31:45.907115+00:00: manual__2022-08-09T05:31:45.907115+00:00, state:running, queued_at: 2022-08-09 05:31:46.059843+00:00. externally triggered: True> successful
   {dagrun.py:612} INFO - DagRun Finished: dag_id=start_a1, execution_date=2022-08-09 05:31:45.907115+00:00, run_id=manual__2022-08-09T05:31:45.907115+00:00, run_start_date=2022-08-09 05:31:58.934238+00:00, run_end_date=2022-08-09 05:32:20.411708+00:00, run_duration=21.47747, state=success, external_trigger=True, run_type=manual, data_interval_start=2022-08-09 05:31:45.907115+00:00, data_interval_end=2022-08-09 05:31:45.907115+00:00, dag_hash=cf8aacf64e7f829b69d2b2baf4ba97fb
   {dag.py:3178} INFO - Setting next_dagrun for start_a1 to None, run_after=None
   {scheduler_job.py:583} INFO - Executor reports execution of start_a1.a1_a3 run_id=manual__2022-08-09T05:31:45.907115+00:00 exited with status success for try_number 1
   {scheduler_job.py:583} INFO - Executor reports execution of start_a1.a1_a2 run_id=manual__2022-08-09T05:31:45.907115+00:00 exited with status success for try_number 1
   {scheduler_job.py:626} INFO - TaskInstance Finished: dag_id=start_a1, task_id=a1_a2, run_id=manual__2022-08-09T05:31:45.907115+00:00, map_index=-1, run_start_date=2022-08-09 05:32:00.297901+00:00, run_end_date=2022-08-09 05:32:20.156646+00:00, run_duration=19.858745, state=success, executor_state=success, try_number=1, max_tries=0, job_id=3, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2022-08-09 05:31:59.210223+00:00, queued_by_job_id=1, pid=52002
   {scheduler_job.py:626} INFO - TaskInstance Finished: dag_id=start_a1, task_id=a1_a3, run_id=manual__2022-08-09T05:31:45.907115+00:00, map_index=-1, run_start_date=2022-08-09 05:32:00.479911+00:00, run_end_date=2022-08-09 05:32:20.058977+00:00, run_duration=19.579066, state=success, executor_state=success, try_number=1, max_tries=0, job_id=2, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2022-08-09 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MatrixManAtYrService commented on issue #25586: Something about this dataset digraph prevents datasets from triggering dags

Posted by GitBox <gi...@apache.org>.
MatrixManAtYrService commented on issue #25586:
URL: https://github.com/apache/airflow/issues/25586#issuecomment-1209773695

   I had a wrong assumption.   I assumed that the dag would run if "any" of its inlet datasets updated, when in fact it's required that "all" of them do.   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MatrixManAtYrService closed issue #25586: Something about this dataset digraph prevents datasets from triggering dags

Posted by GitBox <gi...@apache.org>.
MatrixManAtYrService closed issue #25586: Something about this dataset digraph prevents datasets from triggering dags
URL: https://github.com/apache/airflow/issues/25586


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org