You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "sangrpatil2 (via GitHub)" <gi...@apache.org> on 2023/02/15 15:48:33 UTC

[GitHub] [airflow] sangrpatil2 opened a new issue, #29555: Dag Fails while creating a Dynamic Tasks using airflow variables.

sangrpatil2 opened a new issue, #29555:
URL: https://github.com/apache/airflow/issues/29555

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   We have dynamic Dag, where we have a few Task Groups and each of these contains dynamic tasks depending on airflow variables. In variables, we store a list of batches to execute.
   
   e.g. Airflow variables: `batch_list [1, 2]`. Suppose we have Task Groups like A, B & C. So here we're going to execute each batch in these respective task groups (as shown below).
   
   ![image](https://user-images.githubusercontent.com/125476106/219056114-d535577d-2bf1-4fa2-901c-2fc92a021e3e.png)
   
   Tasks/Steps: 
    
   
   - **Start:** It just collects the config details sent to the Dag 
   
   - **Fetch Batch:** It queries a few tables and prepares the list of batches to execute.
   
   - **Load Batch:** It updates the environment variable with the latest batches fetched from the previous step. 
   
   -  **A/B/C:** These are Task Groups. Each of them contains dynamic tasks depending on `batch_list` from airflow variables.
   
   - **A1, A2, B1, B2 ... :** These are the dynamic Tasks, which execute the spark script and generate the dataset and write it to the S3. 
   
   But in a few executions/runs it didn't refresh the tasks as per the latest variables (Batches). Instead, it tries to execute the same old task/step from the previous run and fails with the error - `Dependencies not met for <TaskInstance: DAG-NAME.A1>`. 
   
   For example, In the first run, it executed `batch_list: [1, 2]`. But In the next, it might fail with the below scenarios. Consider `batch_list: [3,4]` for the second run :
   
   - It Marks Tasks within the Task Group as failed and all the downstream steps as `upstream_failed`. But on page refresh, you can see it updates the dynamic tasks with the latest batches (environment variable) and starts executing the same but still downstream steps will be in the `upstream_failed` state. 
   
   Before Refresh:
   
   ![image](https://user-images.githubusercontent.com/125476106/219071671-468d0c7b-5ec9-4bbe-97e6-e52024d7afa3.png)
   
   
   On Refresh:
   
   ![image](https://user-images.githubusercontent.com/125476106/219068865-961adba9-5854-47c7-b13e-6a44376a462e.png)
   
   
   
   - It removes the old steps (which were failed) and updates the steps with the latest batches (environment variable) and marks all the steps including the latest dynamic steps as `upstream_failed`
   
   ![image](https://user-images.githubusercontent.com/125476106/219072232-94961f00-d567-4ca7-88ba-f3105ec3f415.png)
   
   
   To overcome this issue, we tried a few workarounds like adding a delay of a few minutes with the dummy step prior to dynamic step creation. But still, sometimes it fails to update the dynamic steps.
   
   ![image](https://user-images.githubusercontent.com/125476106/219073322-8b245149-d328-4d58-ad47-7093cfb2e977.png)
   
   
   
   
   **Note:** Airflow version : 2.2.2
   
   ### What you think should happen instead
   
   It should update the Dag with the correct dynamic tasks.
   
   ### How to reproduce
   
   You can create some dynamic dags using Airflow variables with the Airflow version. 2.2.2 and try a few runs with different variables.
   
   ### Operating System
   
   -
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   MWAA
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #29555: Dag Fails while creating a Dynamic Tasks using airflow variables.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on issue #29555:
URL: https://github.com/apache/airflow/issues/29555#issuecomment-1445125338

   Yes. You miss the fact that Airlfow 2.2 does not have dyamic task mapping implemented. You need to upgrade (ideally to 2.5.1 as this one contains most bug fixes and new features) in order to use Dynamic Task Mapping.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sangrpatil2 commented on issue #29555: Dag Fails while creating a Dynamic Tasks using airflow variables.

Posted by "sangrpatil2 (via GitHub)" <gi...@apache.org>.
sangrpatil2 commented on issue #29555:
URL: https://github.com/apache/airflow/issues/29555#issuecomment-1439976504

   @potiuk 
   
   Thanks for the suggestion. I tried the sample code given in the above link. But I'm getting one error (Airflow Version. 2.2.2).
   
   **Code:**
   
   ```
   from __future__ import annotations
   
   from datetime import datetime
   
   from airflow import DAG
   from airflow.decorators import task
   
   with DAG(dag_id="example_dynamic_task_mapping", start_date=datetime(2022, 3, 4)) as dag:
   
       @task
       def add_one(x: int):
           return x + 1
   
       @task
       def sum_it(values):
           total = sum(values)
           print(f"Total was {total}")
   
       added_values = add_one.expand(x=[1, 2, 3])
       sum_it(added_values)
   ```
   **Error:**
   
   ![image](https://user-images.githubusercontent.com/125476106/220625336-7d7446a9-bfb9-465c-8358-a937d719f1ba.png)
   
   Is there anything missing in the above code?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed issue #29555: Dag Fails while creating a Dynamic Tasks using airflow variables.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk closed issue #29555: Dag Fails while creating a Dynamic Tasks using airflow variables.
URL: https://github.com/apache/airflow/issues/29555


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on issue #29555: Dag Fails while creating a Dynamic Tasks using airflow variables.

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on issue #29555:
URL: https://github.com/apache/airflow/issues/29555#issuecomment-1436137871

   Can you provide some code for the mapped task group? It's not clear to me why would you map different TIs based on the batches list instead of mapping the same TIs with different inputs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #29555: Dag Fails while creating a Dynamic Tasks using airflow variables.

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on issue #29555:
URL: https://github.com/apache/airflow/issues/29555#issuecomment-1439933296

   I believe (If I understand correctly) you cannot use variables at the top-level to dynamically add tasks like that. The structure of the  DAG has to be fixed when it is parsed, It cannot dynamically create different number of tasks like that between runs. You can "slowly"  change the number of tasks when you construct your DAG based on some external data, but it cannot widely differ between the moment when you parsed it and the moment you executed it.
   
   If you want to have dynamic number of tasks changing between runs you have to use Dynamic Task Mapping:  https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#dynamic-task-mapping - where you will have one task that will produce the "batch_list" (It could be done from variables) and then you should use expand() method to expand the task lkist dynamically at runtime (what you are doing here you try to dynamically change the structure of DAG at the moment it is parsed, not at the moment it is executed).
   
   Closing provisionally, we can reopen if I understood it wrongly


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #29555: Dag Fails while creating a Dynamic Tasks using airflow variables.

Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on issue #29555:
URL: https://github.com/apache/airflow/issues/29555#issuecomment-1431580060

   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sangrpatil2 commented on issue #29555: Dag Fails while creating a Dynamic Tasks using airflow variables.

Posted by "sangrpatil2 (via GitHub)" <gi...@apache.org>.
sangrpatil2 commented on issue #29555:
URL: https://github.com/apache/airflow/issues/29555#issuecomment-1439511002

   @hussein-awala / @ephraimbuddy 
   
   Dag is only failing while creating dynamic tasks using airflow variables, the rest of the tasks/flow is working fine. You can refer to the sample code given below:
   
   `batch_list = [1,2] #[3,4] for next run
   domain_list = [A, B, C]
   
   
   start >> fetch_batch >> load_batch
   
   for domain in domain_list:
   	with TaskGroup(group_id=domain) as domain_tg:
   		step_task_name = 'load_' + domain
   		task_list = []
   
   		for i in range(0, len(batch_list)):
   			batch_id = batch_list[i]
   			task_list.append(create_python_operator(
   				dag=dag,
   				task_name=step_task_name + batch_id,
   				op_kwargs={
   					"command": <execute shell script>
   				},
   				python_callable=ssm_send_command,
   				trigger_rule="all_success"
   			))
   			if i == 0:
   				load_batch >> task_list[0]
   			else:
   				task_list[i-1] >> task_list[i]
   
   	domain_tg >> finish`
   
   Let me know if you need more details about it.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org