You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/08/19 20:02:59 UTC
[GitHub] [airflow] ayanray089 opened a new issue #10408: Dynamic DAG Workflow
ayanray089 opened a new issue #10408:
URL: https://github.com/apache/airflow/issues/10408
Hi,
I am facing an issue while creating a dynamic workflow from a JSON file. The target here is to store all parent and child relationship in a JSON config file. I want to create the DAG workflow dynamically based on the config defined in the config file.
It works for simple sequential scenario where the present task is the parent of the next downstream task.
JSON file:
task_dependency = '{"dependency": [{"Task": "step_1","Parent":"step_0", "command":"echo hello step 1"},{"Task": "step_2","Parent": "step_1", "command":"echo hello step 2"},{"Task": "step_3", "Parent": "step_2", "command":"echo hello step 3"},{"Task": "step_4", "Parent": "step_2", "command":"echo hello step 4"}]}'
**This works:**
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from helper import operation
import json
import logging
default_args = {
'owner' : 'some.user',
'start_date' : airflow.utils.dates.days_ago(1),
# 'end_date' : datetime(2020, 08, 31), ## optional
'depends_on_past' : False,
'email' : ['some.user@noodle.ai'],
'email_on_failure': False,
'email_on_success': False,
'retries' : 1,
'retry_delay' : timedelta(minutes = 2)
}
#Instantiate a DAG
dag = DAG (
'task_dependency_json',
default_args = default_args,
description = 'Simple Demo DAG TASK Dependency',
schedule_interval = '*/60 * * * *', ### This should be cron expression
)
#Tasks
dep = json.loads(operation.task_dependency_1)
for i in range(len(dep['dependency'])):
task = BashOperator(
task_id=dep['dependency'][i]["Task"],
bash_command=dep['dependency'][i]["command"],
dag=dag)
if i==0:
parent = task
else:
task.set_upstream(parent)
parent = task
print('Parent',parent)
print('Task',task)
**But it creates a DAG like:**
step 1>> step 2>> step 3>>step 4
**But I want a DAG like:**
step 1>>step 2>>step 3
>>step4
**To implement the above requirement , I wrote the below code:**
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from helper import operation
import json
import logging
default_args = {
'owner' : 'some.user',
'start_date' : airflow.utils.dates.days_ago(1),
# 'end_date' : datetime(2020, 08, 31), ## optional
'depends_on_past' : False,
'email' : ['some.user@noodle.ai'],
'email_on_failure': False,
'email_on_success': False,
'retries' : 1,
'retry_delay' : timedelta(minutes = 1)
}
#Instantiate a DAG
dag = DAG (
'task_dependency_json',
default_args = default_args,
description = 'Simple Demo DAG TASK Dependency',
schedule_interval = '*/60 * * * *', ### This should be cron expression
)
#Tasks
dep = json.loads(operation.task_dependency)
for i in range(len(dep['dependency'])):
task = BashOperator(
task_id=dep['dependency'][i]["Task"],
bash_command='date',
dag=dag)
#print('Iteration',i)
logging.info("test")
if i>0:
parent = BashOperator(
task_id=dep['dependency'][i]["Parent"],
bash_command='date',
dag=dag)
parent.set_downstream(task)
Can you please let me know how can I achieve it?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #10408: Dynamic DAG Workflow
Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #10408:
URL: https://github.com/apache/airflow/issues/10408#issuecomment-676632963
Thanks for opening your first issue here! Be sure to follow the issue template!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] eladkal commented on issue #10408: Dynamic DAG Workflow
Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #10408:
URL: https://github.com/apache/airflow/issues/10408#issuecomment-739898006
You can ask for support in Stackoverflow or troubleshooting channel in slack.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] eladkal closed issue #10408: Dynamic DAG Workflow
Posted by GitBox <gi...@apache.org>.
eladkal closed issue #10408:
URL: https://github.com/apache/airflow/issues/10408
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org