You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/08/19 20:02:59 UTC

[GitHub] [airflow] ayanray089 opened a new issue #10408: Dynamic DAG Workflow

ayanray089 opened a new issue #10408:
URL: https://github.com/apache/airflow/issues/10408


   Hi,
   
   I am facing an issue while creating a dynamic workflow from a JSON file. The target here is to store all parent and child relationship in a JSON config file. I want to create the DAG workflow dynamically based on the config defined in the config file.
   It works for simple sequential scenario where the present task is the parent of the next downstream task.
   
   JSON file:
   task_dependency = '{"dependency": [{"Task": "step_1","Parent":"step_0", "command":"echo hello step 1"},{"Task": "step_2","Parent": "step_1", "command":"echo hello step 2"},{"Task": "step_3", "Parent": "step_2", "command":"echo hello step 3"},{"Task": "step_4", "Parent": "step_2", "command":"echo hello step 4"}]}'
   
   **This works:**
   
   from datetime import timedelta
   import airflow
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.operators.python_operator import PythonOperator
   from airflow.operators.dummy_operator import DummyOperator
   from helper import operation
   import json
   import logging
   
   default_args = {
       'owner' : 'some.user',
       'start_date' : airflow.utils.dates.days_ago(1),
       # 'end_date' : datetime(2020, 08, 31), ## optional
       'depends_on_past' : False,
       'email' : ['some.user@noodle.ai'],
       'email_on_failure': False,
       'email_on_success': False,
       'retries' : 1,
       'retry_delay' : timedelta(minutes = 2)
   }
   #Instantiate a DAG
   dag = DAG (
       'task_dependency_json',
       default_args = default_args,
       description = 'Simple Demo DAG TASK Dependency',
       schedule_interval = '*/60 * * * *', ### This should be cron expression
   )
   #Tasks
   dep = json.loads(operation.task_dependency_1)
   
   
   
   for i in range(len(dep['dependency'])):
   
   
       task = BashOperator(
         task_id=dep['dependency'][i]["Task"],
         bash_command=dep['dependency'][i]["command"],
         dag=dag)
       
   
       if i==0:
           parent = task
   
       else:
           
           task.set_upstream(parent)
           parent = task
   
           
       print('Parent',parent)
       print('Task',task)
       
   **But it creates a DAG like:**
   
   step 1>> step 2>> step 3>>step 4
   
   **But I want a DAG like:**
   
   step 1>>step 2>>step 3
                            >>step4
   
   **To implement the above requirement , I wrote the below code:**
   
   from datetime import timedelta
   import airflow
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.operators.python_operator import PythonOperator
   from helper import operation
   import json
   import logging
   default_args = {
       'owner' : 'some.user',
       'start_date' : airflow.utils.dates.days_ago(1),
       # 'end_date' : datetime(2020, 08, 31), ## optional
       'depends_on_past' : False,
       'email' : ['some.user@noodle.ai'],
       'email_on_failure': False,
       'email_on_success': False,
       'retries' : 1,
       'retry_delay' : timedelta(minutes = 1)
   }
   #Instantiate a DAG
   dag = DAG (
       'task_dependency_json',
       default_args = default_args,
       description = 'Simple Demo DAG TASK Dependency',
       schedule_interval = '*/60 * * * *', ### This should be cron expression
   )
   #Tasks
   dep = json.loads(operation.task_dependency)
   for i in range(len(dep['dependency'])):
       task = BashOperator(
         task_id=dep['dependency'][i]["Task"],
         bash_command='date',
         dag=dag)
       #print('Iteration',i)
       logging.info("test")
       if i>0:
           parent = BashOperator(
                   task_id=dep['dependency'][i]["Parent"],
                   bash_command='date',
                   dag=dag)
           parent.set_downstream(task)
   
   
   Can you please let me know how can I achieve it?
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #10408: Dynamic DAG Workflow

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #10408:
URL: https://github.com/apache/airflow/issues/10408#issuecomment-676632963


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on issue #10408: Dynamic DAG Workflow

Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #10408:
URL: https://github.com/apache/airflow/issues/10408#issuecomment-739898006


   You can ask for support in Stackoverflow or troubleshooting channel in slack.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal closed issue #10408: Dynamic DAG Workflow

Posted by GitBox <gi...@apache.org>.
eladkal closed issue #10408:
URL: https://github.com/apache/airflow/issues/10408


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org