You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Devjyoti Patra <de...@qubole.com> on 2017/04/19 10:44:53 UTC

SubdagOperator work with CeleryExecutor

I am not able to make SubdagOperator work with CeleryExecutor. The
following code works well on my local setup (with LocalExecutor), but on
production, when clicking on "Zoom into Sub Dag", Airflow complains that
dag with the name  "parent_name.child_name". is not found. Please tell me
what did I miss out in my implementation.

Thanks,
Devj


default_args = {
    'owner': 'airflow',
    'start_date': datetime.strptime('${date_str}', '%Y-%m-%d'),
    'email': ['${email_list}'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'queue': 'default'}

def sub_dag(child_dag_name, default_args, start_date, schedule_interval, kas):
    subdag = DAG(
        '%s.%s' % (PARENT_DAG_NAME, child_dag_name),
        default_args=default_args,
        schedule_interval=schedule_interval,
        start_date=start_date,
    )

    fork = DummyOperator(task_id='discovery_fork_' + str(k),
                                 dag=subdag)
    #The JOIN task has to be changed for writing to RDS
     join = BashOperator(
            task_id='join_' + str(k),
            bash_command='echo "more wait for subdag..."',
            default_args=default_args,
            dag=subdag
      )

for k in kas:
task = QuboleOperator(

               task_id='task_' + str(k),
               command_type='sparkcmd',
               sql="SOME QUERY",
               qubole_conn_id='default',
               provide_context=True,
               dag=subdag)

         task.set_upstream(fork)
         task.set_downstream(join)


return subdag

#Airflow pipeline is created below
dag = DAG(PARENT_DAG_NAME,
        default_args=default_args,
        schedule_interval='@hourly')



start_node = DummyOperator(task_id='start',
                           dag=dag)


end_node = DummyOperator(task_id='end',
                         dag=dag)



setup_task = QuboleOperator(
        task_id='setup_task',
        command_type='sparkcmd',
        sql="SOME QUERY",
        qubole_conn_id='default',
        provide_context=True,
        dag=dag)

setup_task.set_upstream(start_node)


for k in k_per_subdag:
    child_name = SUBDAG_NAME_PREFIX + str(k)

    branch = SubDagOperator(
        subdag=sub_dag(child_name, default_args, dag.start_date,
dag.schedule_interval, k),
        default_args=default_args,
        task_id=child_name,
        dag=dag
    )

    branch.set_upstream(setup_task)
    branch.set_downstream(end_node)

Re: SubdagOperator work with CeleryExecutor

Posted by Devjyoti Patra <de...@qubole.com>.
Thank you Paul and Bolke.

We have some older version of Airflow deployed on our prod system and I
could make the pipeline work on it by replacing the following import
statement :


from airflow.operators.subdag_operator import SubDagOperator

with

from airflow.operators import SubDagOperator


On Fri, Apr 21, 2017 at 8:34 PM, Bolke de Bruin <bd...@gmail.com> wrote:

> Most of the (known) issues will be fixed in 1.8.1, which we are working on
> getting out the door.
>
> I'm not sure what can cause the below issue, and will investigate when I
> have some time.
>
> Bolke
>
> Sent from my iPhone
>
> > On 21 Apr 2017, at 16:50, Paul Zaczkiewicz <pa...@gmail.com> wrote:
> >
> > There are major outstanding issues with SubDagOperator. in 1.8.0 separate
> > from what you're experiencing here.  Sometimes a SubDag run will just
> hang,
> > and you can't re-run portions of the SubDag without re-running the entire
> > SubDag. I'd recommend against using SubDags in 1.8.0.
> >
> > I'm not sure what's causing your Zoom Into Subdag issues though.
> >
> > On Wed, Apr 19, 2017 at 6:44 AM, Devjyoti Patra <de...@qubole.com>
> > wrote:
> >
> >> I am not able to make SubdagOperator work with CeleryExecutor. The
> >> following code works well on my local setup (with LocalExecutor), but on
> >> production, when clicking on "Zoom into Sub Dag", Airflow complains that
> >> dag with the name  "parent_name.child_name". is not found. Please tell
> me
> >> what did I miss out in my implementation.
> >>
> >> Thanks,
> >> Devj
> >>
> >>
> >> default_args = {
> >>    'owner': 'airflow',
> >>    'start_date': datetime.strptime('${date_str}', '%Y-%m-%d'),
> >>    'email': ['${email_list}'],
> >>    'email_on_failure': True,
> >>    'email_on_retry': True,
> >>    'retries': 1,
> >>    'retry_delay': timedelta(minutes=5),
> >>    'queue': 'default'}
> >>
> >> def sub_dag(child_dag_name, default_args, start_date, schedule_interval,
> >> kas):
> >>    subdag = DAG(
> >>        '%s.%s' % (PARENT_DAG_NAME, child_dag_name),
> >>        default_args=default_args,
> >>        schedule_interval=schedule_interval,
> >>        start_date=start_date,
> >>    )
> >>
> >>    fork = DummyOperator(task_id='discovery_fork_' + str(k),
> >>                                 dag=subdag)
> >>    #The JOIN task has to be changed for writing to RDS
> >>     join = BashOperator(
> >>            task_id='join_' + str(k),
> >>            bash_command='echo "more wait for subdag..."',
> >>            default_args=default_args,
> >>            dag=subdag
> >>      )
> >>
> >> for k in kas:
> >> task = QuboleOperator(
> >>
> >>               task_id='task_' + str(k),
> >>               command_type='sparkcmd',
> >>               sql="SOME QUERY",
> >>               qubole_conn_id='default',
> >>               provide_context=True,
> >>               dag=subdag)
> >>
> >>         task.set_upstream(fork)
> >>         task.set_downstream(join)
> >>
> >>
> >> return subdag
> >>
> >> #Airflow pipeline is created below
> >> dag = DAG(PARENT_DAG_NAME,
> >>        default_args=default_args,
> >>        schedule_interval='@hourly')
> >>
> >>
> >>
> >> start_node = DummyOperator(task_id='start',
> >>                           dag=dag)
> >>
> >>
> >> end_node = DummyOperator(task_id='end',
> >>                         dag=dag)
> >>
> >>
> >>
> >> setup_task = QuboleOperator(
> >>        task_id='setup_task',
> >>        command_type='sparkcmd',
> >>        sql="SOME QUERY",
> >>        qubole_conn_id='default',
> >>        provide_context=True,
> >>        dag=dag)
> >>
> >> setup_task.set_upstream(start_node)
> >>
> >>
> >> for k in k_per_subdag:
> >>    child_name = SUBDAG_NAME_PREFIX + str(k)
> >>
> >>    branch = SubDagOperator(
> >>        subdag=sub_dag(child_name, default_args, dag.start_date,
> >> dag.schedule_interval, k),
> >>        default_args=default_args,
> >>        task_id=child_name,
> >>        dag=dag
> >>    )
> >>
> >>    branch.set_upstream(setup_task)
> >>    branch.set_downstream(end_node)
> >>
>

Re: SubdagOperator work with CeleryExecutor

Posted by Bolke de Bruin <bd...@gmail.com>.
Most of the (known) issues will be fixed in 1.8.1, which we are working on getting out the door. 

I'm not sure what can cause the below issue, and will investigate when I have some time. 

Bolke

Sent from my iPhone

> On 21 Apr 2017, at 16:50, Paul Zaczkiewicz <pa...@gmail.com> wrote:
> 
> There are major outstanding issues with SubDagOperator. in 1.8.0 separate
> from what you're experiencing here.  Sometimes a SubDag run will just hang,
> and you can't re-run portions of the SubDag without re-running the entire
> SubDag. I'd recommend against using SubDags in 1.8.0.
> 
> I'm not sure what's causing your Zoom Into Subdag issues though.
> 
> On Wed, Apr 19, 2017 at 6:44 AM, Devjyoti Patra <de...@qubole.com>
> wrote:
> 
>> I am not able to make SubdagOperator work with CeleryExecutor. The
>> following code works well on my local setup (with LocalExecutor), but on
>> production, when clicking on "Zoom into Sub Dag", Airflow complains that
>> dag with the name  "parent_name.child_name". is not found. Please tell me
>> what did I miss out in my implementation.
>> 
>> Thanks,
>> Devj
>> 
>> 
>> default_args = {
>>    'owner': 'airflow',
>>    'start_date': datetime.strptime('${date_str}', '%Y-%m-%d'),
>>    'email': ['${email_list}'],
>>    'email_on_failure': True,
>>    'email_on_retry': True,
>>    'retries': 1,
>>    'retry_delay': timedelta(minutes=5),
>>    'queue': 'default'}
>> 
>> def sub_dag(child_dag_name, default_args, start_date, schedule_interval,
>> kas):
>>    subdag = DAG(
>>        '%s.%s' % (PARENT_DAG_NAME, child_dag_name),
>>        default_args=default_args,
>>        schedule_interval=schedule_interval,
>>        start_date=start_date,
>>    )
>> 
>>    fork = DummyOperator(task_id='discovery_fork_' + str(k),
>>                                 dag=subdag)
>>    #The JOIN task has to be changed for writing to RDS
>>     join = BashOperator(
>>            task_id='join_' + str(k),
>>            bash_command='echo "more wait for subdag..."',
>>            default_args=default_args,
>>            dag=subdag
>>      )
>> 
>> for k in kas:
>> task = QuboleOperator(
>> 
>>               task_id='task_' + str(k),
>>               command_type='sparkcmd',
>>               sql="SOME QUERY",
>>               qubole_conn_id='default',
>>               provide_context=True,
>>               dag=subdag)
>> 
>>         task.set_upstream(fork)
>>         task.set_downstream(join)
>> 
>> 
>> return subdag
>> 
>> #Airflow pipeline is created below
>> dag = DAG(PARENT_DAG_NAME,
>>        default_args=default_args,
>>        schedule_interval='@hourly')
>> 
>> 
>> 
>> start_node = DummyOperator(task_id='start',
>>                           dag=dag)
>> 
>> 
>> end_node = DummyOperator(task_id='end',
>>                         dag=dag)
>> 
>> 
>> 
>> setup_task = QuboleOperator(
>>        task_id='setup_task',
>>        command_type='sparkcmd',
>>        sql="SOME QUERY",
>>        qubole_conn_id='default',
>>        provide_context=True,
>>        dag=dag)
>> 
>> setup_task.set_upstream(start_node)
>> 
>> 
>> for k in k_per_subdag:
>>    child_name = SUBDAG_NAME_PREFIX + str(k)
>> 
>>    branch = SubDagOperator(
>>        subdag=sub_dag(child_name, default_args, dag.start_date,
>> dag.schedule_interval, k),
>>        default_args=default_args,
>>        task_id=child_name,
>>        dag=dag
>>    )
>> 
>>    branch.set_upstream(setup_task)
>>    branch.set_downstream(end_node)
>> 

Re: SubdagOperator work with CeleryExecutor

Posted by Paul Zaczkiewicz <pa...@gmail.com>.
There are major outstanding issues with SubDagOperator. in 1.8.0 separate
from what you're experiencing here.  Sometimes a SubDag run will just hang,
and you can't re-run portions of the SubDag without re-running the entire
SubDag. I'd recommend against using SubDags in 1.8.0.

I'm not sure what's causing your Zoom Into Subdag issues though.

On Wed, Apr 19, 2017 at 6:44 AM, Devjyoti Patra <de...@qubole.com>
wrote:

> I am not able to make SubdagOperator work with CeleryExecutor. The
> following code works well on my local setup (with LocalExecutor), but on
> production, when clicking on "Zoom into Sub Dag", Airflow complains that
> dag with the name  "parent_name.child_name". is not found. Please tell me
> what did I miss out in my implementation.
>
> Thanks,
> Devj
>
>
> default_args = {
>     'owner': 'airflow',
>     'start_date': datetime.strptime('${date_str}', '%Y-%m-%d'),
>     'email': ['${email_list}'],
>     'email_on_failure': True,
>     'email_on_retry': True,
>     'retries': 1,
>     'retry_delay': timedelta(minutes=5),
>     'queue': 'default'}
>
> def sub_dag(child_dag_name, default_args, start_date, schedule_interval,
> kas):
>     subdag = DAG(
>         '%s.%s' % (PARENT_DAG_NAME, child_dag_name),
>         default_args=default_args,
>         schedule_interval=schedule_interval,
>         start_date=start_date,
>     )
>
>     fork = DummyOperator(task_id='discovery_fork_' + str(k),
>                                  dag=subdag)
>     #The JOIN task has to be changed for writing to RDS
>      join = BashOperator(
>             task_id='join_' + str(k),
>             bash_command='echo "more wait for subdag..."',
>             default_args=default_args,
>             dag=subdag
>       )
>
> for k in kas:
> task = QuboleOperator(
>
>                task_id='task_' + str(k),
>                command_type='sparkcmd',
>                sql="SOME QUERY",
>                qubole_conn_id='default',
>                provide_context=True,
>                dag=subdag)
>
>          task.set_upstream(fork)
>          task.set_downstream(join)
>
>
> return subdag
>
> #Airflow pipeline is created below
> dag = DAG(PARENT_DAG_NAME,
>         default_args=default_args,
>         schedule_interval='@hourly')
>
>
>
> start_node = DummyOperator(task_id='start',
>                            dag=dag)
>
>
> end_node = DummyOperator(task_id='end',
>                          dag=dag)
>
>
>
> setup_task = QuboleOperator(
>         task_id='setup_task',
>         command_type='sparkcmd',
>         sql="SOME QUERY",
>         qubole_conn_id='default',
>         provide_context=True,
>         dag=dag)
>
> setup_task.set_upstream(start_node)
>
>
> for k in k_per_subdag:
>     child_name = SUBDAG_NAME_PREFIX + str(k)
>
>     branch = SubDagOperator(
>         subdag=sub_dag(child_name, default_args, dag.start_date,
> dag.schedule_interval, k),
>         default_args=default_args,
>         task_id=child_name,
>         dag=dag
>     )
>
>     branch.set_upstream(setup_task)
>     branch.set_downstream(end_node)
>