You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@airflow.apache.org by Reed Villanueva <rv...@ucera.org> on 2020/01/27 22:50:09 UTC

Can airflow dags have branches that both run, but do not converge?

...because these seems to not be that case.

I have an airflow graph with a conditional branch defined *like*

class BranchFlags(Enum):
    yes = "yes"
    no = "no"...for table in list_of_tables # type list(dict)
    task_1 = BashOperator(
        task_id='task_1_%s' % table["conf1"],
        bash_command='bash script1.sh %s' % table["conf1"],
        dag=dag)

    if table["branch_flag"] == BranchFlags.yes:
        consolidate = BashOperator(
            task_id='task_3_%s' % table["conf2"],
            bash_command='python %s/consolidate_parquet.py %s' % table["conf2"],
            dag=dag)

    task_3 = BashOperator(
        task_id='task_3_%s' % table["conf3"],
        bash_command='bash script3.sh %s' % table["conf3"],
        dag=dag)

    task_1 >> task_3
    if table["branch_flag"] == BranchFlags.yes:
        task_1 >> task_2

Even though the longer parts of the graph run fine, the lone branch is not
being run for the one sequence / pipeline that was supposed to branch. When
viewing the logs for the task, I see

*** Task instance did not exist in the DB

This is weird to me, since ostensibly the scheduler DB sees the task since
it does appear in the web UI graph. Not sure what is going on here and
adding other changes to the dag .py file do show up in the graph and are
executed by the scheduler when running the graph. And attempting to view
the tasks Task Instance Details throws error

Task [dagname.task_3_qwerty] doesn't seem to exist at the moment

Running airflow resetdb (as I've seen in other posts) does nothing for the
problem.

Note that the intention is that the short branch runs concurrently with the
longer branch (not as an either or choice).

Anyone know why this would be happening or have some debugging tips?

-- 
This electronic message is intended only for the named 
recipient, and may 
contain information that is confidential or 
privileged. If you are not the 
intended recipient, you are 
hereby notified that any disclosure, copying, 
distribution or 
use of the contents of this message is strictly 
prohibited. If 
you have received this message in error or are not the 
named
recipient, please notify us immediately by contacting the 
sender at 
the electronic mail address noted above, and delete 
and destroy all copies 
of this message. Thank you.

Re: Can airflow dags have branches that both run, but do not converge?

Posted by Reed Villanueva <rv...@ucera.org>.
Yes, I could see all the expected tasks in the DAG in the web UI.
Have not tried the list_dags method (FYI: this experience was some time ago
and the DAG no longer looks this way (was asking because was trying to plan
for a future dag design consideration and remembered this bug)).


On Tue, Jan 28, 2020 at 12:01 PM Chris Palmer <ch...@crpalmer.com> wrote:

> When you look at the DAG in Graph View or Tree View, do you see all the
> expected tasks and with all the expected dependencies? What about when you
> use the CLI to list tasks for the DAG?
>
> Chris
>
> On Tue, Jan 28, 2020 at 4:19 PM Reed Villanueva <rv...@ucera.org>
> wrote:
>
>> Interesting that the split paths run fine for you, thanks.
>> To answer your question, the "pipelines" for the different datasets can
>> run in parallel, but for some odd reason, the split path in pipeline2 never
>> runs the extra task that differs from pipeline1 unless I have some
>> downstream task in pipeline2 that both tasks of pipeline2's split both
>> converge onto.
>> Thought this was very odd, but thanks for confirming that it is not
>> normal.
>>
>> On Tue, Jan 28, 2020 at 11:07 AM Shaw, Damian P. <
>> damian.shaw.2@credit-suisse.com> wrote:
>>
>>> I am able to create your example DAG with dummy operators and it runs
>>> fine:
>>>
>>>
>>>
>>> from airflow import DAG
>>>
>>> from airflow.utils.dates import days_ago
>>>
>>> from airflow.operators.dummy_operator import DummyOperator
>>>
>>>
>>>
>>> dag = DAG('test_branches',
>>>
>>>           schedule_interval='@once',
>>>
>>>           start_date=days_ago(2))
>>>
>>>
>>>
>>> with dag:
>>>
>>>     root = DummyOperator(task_id='root')
>>>
>>>
>>>
>>>     # Pipeline 1
>>>
>>>     pipe1 = DummyOperator(task_id='pipe1')
>>>
>>>     pipe1.set_upstream(root)
>>>
>>>
>>>
>>>     pipe1a = DummyOperator(task_id='pipe1a')
>>>
>>>     pipe1a.set_upstream(pipe1)
>>>
>>>     pipe1b = DummyOperator(task_id='pipe1b')
>>>
>>>     pipe1b.set_upstream(pipe1)
>>>
>>>
>>>
>>>     # Pipeline 2
>>>
>>>     pipe2 = DummyOperator(task_id='pipe2')
>>>
>>>     pipe2.set_upstream(root)
>>>
>>>
>>>
>>>     pipe2a = DummyOperator(task_id='pipe2a')
>>>
>>>     pipe2a.set_upstream(pipe2)
>>>
>>>
>>>
>>>
>>>
>>> Does this DAG run fine for you with the different branches running in
>>> parallel? If not maybe there’s some configuration issue in your Airflow.
>>> Otherwise there’s some logic problem in your DAG creation.
>>>
>>>
>>>
>>> Damian
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *From:* Reed Villanueva <rv...@ucera.org>
>>> *Sent:* Tuesday, January 28, 2020 15:38
>>> *To:* users@airflow.apache.org
>>> *Subject:* Re: Can airflow dags have branches that both run, but do not
>>> converge?
>>>
>>>
>>>
>>> I expect it the branch to be made at dag creation time and not run time.
>>>
>>> Ie. my dag looks like...
>>>
>>> [image: image.png]
>>>
>>> (which is static and is what I want to see) and for some reason, the dag
>>> only seems to run the purple task when I have another downstream task that
>>> both blue and purple tasks in that "pipeline" both connect to. (Yet the
>>> split "pipelines" for dataset 1 and 2 can both run concurrently without
>>> having to converge back together downstream). Note that I can use
>>> conditional logic to put the purple tasks before or after the blue task in
>>> pipeline 2, but oddly when I make the path split, it does not run.
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Jan 28, 2020 at 10:11 AM Shaw, Damian P. <
>>> damian.shaw.2@credit-suisse.com> wrote:
>>>
>>> This is what I mean by dynamic is this in your code:
>>>
>>>
>>>
>>>     if table["branch_flag"] == BranchFlags.yes:
>>>
>>>         task_1 >> task_2
>>>
>>>
>>>
>>> This will be run at DAG creation time not a DAG run time, and therefore
>>> your “branch” will not work as you expect.
>>>
>>>
>>>
>>> To get conditional branches you have to use the Branch Operators (either
>>> the BranchPythonOperator or sub-class the BaseBranchOperator):
>>>
>>>
>>> https://airflow.apache.org/docs/stable/concepts.html?highlight=branch#branching
>>>
>>> https://www.astronomer.io/guides/airflow-branch-operator/
>>>
>>>
>>>
>>>
>>>
>>> Damian
>>>
>>>
>>>
>>> *From:* Reed Villanueva <rv...@ucera.org>
>>> *Sent:* Tuesday, January 28, 2020 14:54
>>> *To:* users@airflow.apache.org
>>> *Subject:* Re: Can airflow dags have branches that both run, but do not
>>> converge?
>>>
>>>
>>>
>>> I am not trying to dynamically branch while the dag is running if that's
>>> what you're asking.
>>>
>>> The dag is intended to be static.
>>>
>>> In the code given, the list_of_tables variable is a list of dicts of
>>> configs for different datasets, some of which use tasks that could be done
>>> concurrently (eg. write to local drive and write to remote DB) hence the
>>> branching.
>>>
>>>
>>>
>>> On Tue, Jan 28, 2020 at 6:51 AM Shaw, Damian P. <
>>> damian.shaw.2@credit-suisse.com> wrote:
>>>
>>> Hi Reed,
>>>
>>>
>>>
>>> Your DAG creation shouldn’t be dynamically changing frequently. The DAG
>>> itself in Airflow is fairly static, if in your DAG definition file you
>>> dynamically change your DAG based on conditions Airflow is simply going to
>>> think the tasks you are no longer defining don’t exist anymore. That’s what
>>> Airflow is  telling you, that you are no longer defining that task so it
>>> can’t find it anymore.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Damian
>>>
>>>
>>>
>>> *From:* Reed Villanueva <rv...@ucera.org>
>>> *Sent:* Monday, January 27, 2020 17:50
>>> *To:* users@airflow.apache.org
>>> *Subject:* Can airflow dags have branches that both run, but do not
>>> converge?
>>>
>>>
>>>
>>> ...because these seems to not be that case.
>>>
>>>
>>>
>>> I have an airflow graph with a conditional branch defined *like*
>>>
>>> class BranchFlags(Enum):
>>>
>>>     yes = "yes"
>>>
>>>     no = "no"
>>>
>>> ...
>>>
>>> for table in list_of_tables # type list(dict)
>>>
>>>     task_1 = BashOperator(
>>>
>>>         task_id='task_1_%s' % table["conf1"],
>>>
>>>         bash_command='bash script1.sh %s' % table["conf1"],
>>>
>>>         dag=dag)
>>>
>>>
>>>
>>>     if table["branch_flag"] == BranchFlags.yes:
>>>
>>>         consolidate = BashOperator(
>>>
>>>             task_id='task_3_%s' % table["conf2"],
>>>
>>>             bash_command='python %s/consolidate_parquet.py %s' % table["conf2"],
>>>
>>>             dag=dag)
>>>
>>>
>>>
>>>     task_3 = BashOperator(
>>>
>>>         task_id='task_3_%s' % table["conf3"],
>>>
>>>         bash_command='bash script3.sh %s' % table["conf3"],
>>>
>>>         dag=dag)
>>>
>>>
>>>
>>>     task_1 >> task_3
>>>
>>>     if table["branch_flag"] == BranchFlags.yes:
>>>
>>>         task_1 >> task_2
>>>
>>> Even though the longer parts of the graph run fine, the lone branch is
>>> not being run for the one sequence / pipeline that was supposed to branch.
>>> When viewing the logs for the task, I see
>>>
>>> *** Task instance did not exist in the DB
>>>
>>> This is weird to me, since ostensibly the scheduler DB sees the task
>>> since it does appear in the web UI graph. Not sure what is going on here
>>> and adding other changes to the dag .py file do show up in the graph
>>> and are executed by the scheduler when running the graph. And attempting to
>>> view the tasks Task Instance Details throws error
>>>
>>> Task [dagname.task_3_qwerty] doesn't seem to exist at the moment
>>>
>>> Running airflow resetdb (as I've seen in other posts) does nothing for
>>> the problem.
>>>
>>> Note that the intention is that the short branch runs concurrently with
>>> the longer branch (not as an either or choice).
>>>
>>> Anyone know why this would be happening or have some debugging tips?
>>>
>>>
>>> This electronic message is intended only for the named
>>> recipient, and may contain information that is confidential or
>>> privileged. If you are not the intended recipient, you are
>>> hereby notified that any disclosure, copying, distribution or
>>> use of the contents of this message is strictly prohibited. If
>>> you have received this message in error or are not the named
>>> recipient, please notify us immediately by contacting the
>>> sender at the electronic mail address noted above, and delete
>>> and destroy all copies of this message. Thank you.
>>>
>>>
>>>
>>>
>>> ==============================================================================
>>> Please access the attached hyperlink for an important electronic
>>> communications disclaimer:
>>> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>>>
>>> ==============================================================================
>>>
>>>
>>> This electronic message is intended only for the named
>>> recipient, and may contain information that is confidential or
>>> privileged. If you are not the intended recipient, you are
>>> hereby notified that any disclosure, copying, distribution or
>>> use of the contents of this message is strictly prohibited. If
>>> you have received this message in error or are not the named
>>> recipient, please notify us immediately by contacting the
>>> sender at the electronic mail address noted above, and delete
>>> and destroy all copies of this message. Thank you.
>>>
>>>
>>>
>>>
>>> ==============================================================================
>>> Please access the attached hyperlink for an important electronic
>>> communications disclaimer:
>>> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>>>
>>> ==============================================================================
>>>
>>>
>>> This electronic message is intended only for the named
>>> recipient, and may contain information that is confidential or
>>> privileged. If you are not the intended recipient, you are
>>> hereby notified that any disclosure, copying, distribution or
>>> use of the contents of this message is strictly prohibited. If
>>> you have received this message in error or are not the named
>>> recipient, please notify us immediately by contacting the
>>> sender at the electronic mail address noted above, and delete
>>> and destroy all copies of this message. Thank you.
>>>
>>>
>>>
>>>
>>> ==============================================================================
>>> Please access the attached hyperlink for an important electronic
>>> communications disclaimer:
>>> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>>>
>>> ==============================================================================
>>>
>>
>> This electronic message is intended only for the named
>> recipient, and may contain information that is confidential or
>> privileged. If you are not the intended recipient, you are
>> hereby notified that any disclosure, copying, distribution or
>> use of the contents of this message is strictly prohibited. If
>> you have received this message in error or are not the named
>> recipient, please notify us immediately by contacting the
>> sender at the electronic mail address noted above, and delete
>> and destroy all copies of this message. Thank you.
>>
>

-- 
This electronic message is intended only for the named 
recipient, and may 
contain information that is confidential or 
privileged. If you are not the 
intended recipient, you are 
hereby notified that any disclosure, copying, 
distribution or 
use of the contents of this message is strictly 
prohibited. If 
you have received this message in error or are not the 
named
recipient, please notify us immediately by contacting the 
sender at 
the electronic mail address noted above, and delete 
and destroy all copies 
of this message. Thank you.

Re: Can airflow dags have branches that both run, but do not converge?

Posted by Chris Palmer <ch...@crpalmer.com>.
When you look at the DAG in Graph View or Tree View, do you see all the
expected tasks and with all the expected dependencies? What about when you
use the CLI to list tasks for the DAG?

Chris

On Tue, Jan 28, 2020 at 4:19 PM Reed Villanueva <rv...@ucera.org>
wrote:

> Interesting that the split paths run fine for you, thanks.
> To answer your question, the "pipelines" for the different datasets can
> run in parallel, but for some odd reason, the split path in pipeline2 never
> runs the extra task that differs from pipeline1 unless I have some
> downstream task in pipeline2 that both tasks of pipeline2's split both
> converge onto.
> Thought this was very odd, but thanks for confirming that it is not normal.
>
> On Tue, Jan 28, 2020 at 11:07 AM Shaw, Damian P. <
> damian.shaw.2@credit-suisse.com> wrote:
>
>> I am able to create your example DAG with dummy operators and it runs
>> fine:
>>
>>
>>
>> from airflow import DAG
>>
>> from airflow.utils.dates import days_ago
>>
>> from airflow.operators.dummy_operator import DummyOperator
>>
>>
>>
>> dag = DAG('test_branches',
>>
>>           schedule_interval='@once',
>>
>>           start_date=days_ago(2))
>>
>>
>>
>> with dag:
>>
>>     root = DummyOperator(task_id='root')
>>
>>
>>
>>     # Pipeline 1
>>
>>     pipe1 = DummyOperator(task_id='pipe1')
>>
>>     pipe1.set_upstream(root)
>>
>>
>>
>>     pipe1a = DummyOperator(task_id='pipe1a')
>>
>>     pipe1a.set_upstream(pipe1)
>>
>>     pipe1b = DummyOperator(task_id='pipe1b')
>>
>>     pipe1b.set_upstream(pipe1)
>>
>>
>>
>>     # Pipeline 2
>>
>>     pipe2 = DummyOperator(task_id='pipe2')
>>
>>     pipe2.set_upstream(root)
>>
>>
>>
>>     pipe2a = DummyOperator(task_id='pipe2a')
>>
>>     pipe2a.set_upstream(pipe2)
>>
>>
>>
>>
>>
>> Does this DAG run fine for you with the different branches running in
>> parallel? If not maybe there’s some configuration issue in your Airflow.
>> Otherwise there’s some logic problem in your DAG creation.
>>
>>
>>
>> Damian
>>
>>
>>
>>
>>
>>
>>
>> *From:* Reed Villanueva <rv...@ucera.org>
>> *Sent:* Tuesday, January 28, 2020 15:38
>> *To:* users@airflow.apache.org
>> *Subject:* Re: Can airflow dags have branches that both run, but do not
>> converge?
>>
>>
>>
>> I expect it the branch to be made at dag creation time and not run time.
>>
>> Ie. my dag looks like...
>>
>> [image: image.png]
>>
>> (which is static and is what I want to see) and for some reason, the dag
>> only seems to run the purple task when I have another downstream task that
>> both blue and purple tasks in that "pipeline" both connect to. (Yet the
>> split "pipelines" for dataset 1 and 2 can both run concurrently without
>> having to converge back together downstream). Note that I can use
>> conditional logic to put the purple tasks before or after the blue task in
>> pipeline 2, but oddly when I make the path split, it does not run.
>>
>>
>>
>>
>>
>> On Tue, Jan 28, 2020 at 10:11 AM Shaw, Damian P. <
>> damian.shaw.2@credit-suisse.com> wrote:
>>
>> This is what I mean by dynamic is this in your code:
>>
>>
>>
>>     if table["branch_flag"] == BranchFlags.yes:
>>
>>         task_1 >> task_2
>>
>>
>>
>> This will be run at DAG creation time not a DAG run time, and therefore
>> your “branch” will not work as you expect.
>>
>>
>>
>> To get conditional branches you have to use the Branch Operators (either
>> the BranchPythonOperator or sub-class the BaseBranchOperator):
>>
>>
>> https://airflow.apache.org/docs/stable/concepts.html?highlight=branch#branching
>>
>> https://www.astronomer.io/guides/airflow-branch-operator/
>>
>>
>>
>>
>>
>> Damian
>>
>>
>>
>> *From:* Reed Villanueva <rv...@ucera.org>
>> *Sent:* Tuesday, January 28, 2020 14:54
>> *To:* users@airflow.apache.org
>> *Subject:* Re: Can airflow dags have branches that both run, but do not
>> converge?
>>
>>
>>
>> I am not trying to dynamically branch while the dag is running if that's
>> what you're asking.
>>
>> The dag is intended to be static.
>>
>> In the code given, the list_of_tables variable is a list of dicts of
>> configs for different datasets, some of which use tasks that could be done
>> concurrently (eg. write to local drive and write to remote DB) hence the
>> branching.
>>
>>
>>
>> On Tue, Jan 28, 2020 at 6:51 AM Shaw, Damian P. <
>> damian.shaw.2@credit-suisse.com> wrote:
>>
>> Hi Reed,
>>
>>
>>
>> Your DAG creation shouldn’t be dynamically changing frequently. The DAG
>> itself in Airflow is fairly static, if in your DAG definition file you
>> dynamically change your DAG based on conditions Airflow is simply going to
>> think the tasks you are no longer defining don’t exist anymore. That’s what
>> Airflow is  telling you, that you are no longer defining that task so it
>> can’t find it anymore.
>>
>>
>>
>>
>>
>>
>>
>> Damian
>>
>>
>>
>> *From:* Reed Villanueva <rv...@ucera.org>
>> *Sent:* Monday, January 27, 2020 17:50
>> *To:* users@airflow.apache.org
>> *Subject:* Can airflow dags have branches that both run, but do not
>> converge?
>>
>>
>>
>> ...because these seems to not be that case.
>>
>>
>>
>> I have an airflow graph with a conditional branch defined *like*
>>
>> class BranchFlags(Enum):
>>
>>     yes = "yes"
>>
>>     no = "no"
>>
>> ...
>>
>> for table in list_of_tables # type list(dict)
>>
>>     task_1 = BashOperator(
>>
>>         task_id='task_1_%s' % table["conf1"],
>>
>>         bash_command='bash script1.sh %s' % table["conf1"],
>>
>>         dag=dag)
>>
>>
>>
>>     if table["branch_flag"] == BranchFlags.yes:
>>
>>         consolidate = BashOperator(
>>
>>             task_id='task_3_%s' % table["conf2"],
>>
>>             bash_command='python %s/consolidate_parquet.py %s' % table["conf2"],
>>
>>             dag=dag)
>>
>>
>>
>>     task_3 = BashOperator(
>>
>>         task_id='task_3_%s' % table["conf3"],
>>
>>         bash_command='bash script3.sh %s' % table["conf3"],
>>
>>         dag=dag)
>>
>>
>>
>>     task_1 >> task_3
>>
>>     if table["branch_flag"] == BranchFlags.yes:
>>
>>         task_1 >> task_2
>>
>> Even though the longer parts of the graph run fine, the lone branch is
>> not being run for the one sequence / pipeline that was supposed to branch.
>> When viewing the logs for the task, I see
>>
>> *** Task instance did not exist in the DB
>>
>> This is weird to me, since ostensibly the scheduler DB sees the task
>> since it does appear in the web UI graph. Not sure what is going on here
>> and adding other changes to the dag .py file do show up in the graph and
>> are executed by the scheduler when running the graph. And attempting to
>> view the tasks Task Instance Details throws error
>>
>> Task [dagname.task_3_qwerty] doesn't seem to exist at the moment
>>
>> Running airflow resetdb (as I've seen in other posts) does nothing for
>> the problem.
>>
>> Note that the intention is that the short branch runs concurrently with
>> the longer branch (not as an either or choice).
>>
>> Anyone know why this would be happening or have some debugging tips?
>>
>>
>> This electronic message is intended only for the named
>> recipient, and may contain information that is confidential or
>> privileged. If you are not the intended recipient, you are
>> hereby notified that any disclosure, copying, distribution or
>> use of the contents of this message is strictly prohibited. If
>> you have received this message in error or are not the named
>> recipient, please notify us immediately by contacting the
>> sender at the electronic mail address noted above, and delete
>> and destroy all copies of this message. Thank you.
>>
>>
>>
>>
>> ==============================================================================
>> Please access the attached hyperlink for an important electronic
>> communications disclaimer:
>> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>>
>> ==============================================================================
>>
>>
>> This electronic message is intended only for the named
>> recipient, and may contain information that is confidential or
>> privileged. If you are not the intended recipient, you are
>> hereby notified that any disclosure, copying, distribution or
>> use of the contents of this message is strictly prohibited. If
>> you have received this message in error or are not the named
>> recipient, please notify us immediately by contacting the
>> sender at the electronic mail address noted above, and delete
>> and destroy all copies of this message. Thank you.
>>
>>
>>
>>
>> ==============================================================================
>> Please access the attached hyperlink for an important electronic
>> communications disclaimer:
>> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>>
>> ==============================================================================
>>
>>
>> This electronic message is intended only for the named
>> recipient, and may contain information that is confidential or
>> privileged. If you are not the intended recipient, you are
>> hereby notified that any disclosure, copying, distribution or
>> use of the contents of this message is strictly prohibited. If
>> you have received this message in error or are not the named
>> recipient, please notify us immediately by contacting the
>> sender at the electronic mail address noted above, and delete
>> and destroy all copies of this message. Thank you.
>>
>>
>>
>>
>> ==============================================================================
>> Please access the attached hyperlink for an important electronic
>> communications disclaimer:
>> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>>
>> ==============================================================================
>>
>
> This electronic message is intended only for the named
> recipient, and may contain information that is confidential or
> privileged. If you are not the intended recipient, you are
> hereby notified that any disclosure, copying, distribution or
> use of the contents of this message is strictly prohibited. If
> you have received this message in error or are not the named
> recipient, please notify us immediately by contacting the
> sender at the electronic mail address noted above, and delete
> and destroy all copies of this message. Thank you.
>

Re: Can airflow dags have branches that both run, but do not converge?

Posted by Reed Villanueva <rv...@ucera.org>.
Interesting that the split paths run fine for you, thanks.
To answer your question, the "pipelines" for the different datasets can run
in parallel, but for some odd reason, the split path in pipeline2 never
runs the extra task that differs from pipeline1 unless I have some
downstream task in pipeline2 that both tasks of pipeline2's split both
converge onto.
Thought this was very odd, but thanks for confirming that it is not normal.

On Tue, Jan 28, 2020 at 11:07 AM Shaw, Damian P. <
damian.shaw.2@credit-suisse.com> wrote:

> I am able to create your example DAG with dummy operators and it runs fine:
>
>
>
> from airflow import DAG
>
> from airflow.utils.dates import days_ago
>
> from airflow.operators.dummy_operator import DummyOperator
>
>
>
> dag = DAG('test_branches',
>
>           schedule_interval='@once',
>
>           start_date=days_ago(2))
>
>
>
> with dag:
>
>     root = DummyOperator(task_id='root')
>
>
>
>     # Pipeline 1
>
>     pipe1 = DummyOperator(task_id='pipe1')
>
>     pipe1.set_upstream(root)
>
>
>
>     pipe1a = DummyOperator(task_id='pipe1a')
>
>     pipe1a.set_upstream(pipe1)
>
>     pipe1b = DummyOperator(task_id='pipe1b')
>
>     pipe1b.set_upstream(pipe1)
>
>
>
>     # Pipeline 2
>
>     pipe2 = DummyOperator(task_id='pipe2')
>
>     pipe2.set_upstream(root)
>
>
>
>     pipe2a = DummyOperator(task_id='pipe2a')
>
>     pipe2a.set_upstream(pipe2)
>
>
>
>
>
> Does this DAG run fine for you with the different branches running in
> parallel? If not maybe there’s some configuration issue in your Airflow.
> Otherwise there’s some logic problem in your DAG creation.
>
>
>
> Damian
>
>
>
>
>
>
>
> *From:* Reed Villanueva <rv...@ucera.org>
> *Sent:* Tuesday, January 28, 2020 15:38
> *To:* users@airflow.apache.org
> *Subject:* Re: Can airflow dags have branches that both run, but do not
> converge?
>
>
>
> I expect it the branch to be made at dag creation time and not run time.
>
> Ie. my dag looks like...
>
> [image: image.png]
>
> (which is static and is what I want to see) and for some reason, the dag
> only seems to run the purple task when I have another downstream task that
> both blue and purple tasks in that "pipeline" both connect to. (Yet the
> split "pipelines" for dataset 1 and 2 can both run concurrently without
> having to converge back together downstream). Note that I can use
> conditional logic to put the purple tasks before or after the blue task in
> pipeline 2, but oddly when I make the path split, it does not run.
>
>
>
>
>
> On Tue, Jan 28, 2020 at 10:11 AM Shaw, Damian P. <
> damian.shaw.2@credit-suisse.com> wrote:
>
> This is what I mean by dynamic is this in your code:
>
>
>
>     if table["branch_flag"] == BranchFlags.yes:
>
>         task_1 >> task_2
>
>
>
> This will be run at DAG creation time not a DAG run time, and therefore
> your “branch” will not work as you expect.
>
>
>
> To get conditional branches you have to use the Branch Operators (either
> the BranchPythonOperator or sub-class the BaseBranchOperator):
>
>
> https://airflow.apache.org/docs/stable/concepts.html?highlight=branch#branching
>
> https://www.astronomer.io/guides/airflow-branch-operator/
>
>
>
>
>
> Damian
>
>
>
> *From:* Reed Villanueva <rv...@ucera.org>
> *Sent:* Tuesday, January 28, 2020 14:54
> *To:* users@airflow.apache.org
> *Subject:* Re: Can airflow dags have branches that both run, but do not
> converge?
>
>
>
> I am not trying to dynamically branch while the dag is running if that's
> what you're asking.
>
> The dag is intended to be static.
>
> In the code given, the list_of_tables variable is a list of dicts of
> configs for different datasets, some of which use tasks that could be done
> concurrently (eg. write to local drive and write to remote DB) hence the
> branching.
>
>
>
> On Tue, Jan 28, 2020 at 6:51 AM Shaw, Damian P. <
> damian.shaw.2@credit-suisse.com> wrote:
>
> Hi Reed,
>
>
>
> Your DAG creation shouldn’t be dynamically changing frequently. The DAG
> itself in Airflow is fairly static, if in your DAG definition file you
> dynamically change your DAG based on conditions Airflow is simply going to
> think the tasks you are no longer defining don’t exist anymore. That’s what
> Airflow is  telling you, that you are no longer defining that task so it
> can’t find it anymore.
>
>
>
>
>
>
>
> Damian
>
>
>
> *From:* Reed Villanueva <rv...@ucera.org>
> *Sent:* Monday, January 27, 2020 17:50
> *To:* users@airflow.apache.org
> *Subject:* Can airflow dags have branches that both run, but do not
> converge?
>
>
>
> ...because these seems to not be that case.
>
>
>
> I have an airflow graph with a conditional branch defined *like*
>
> class BranchFlags(Enum):
>
>     yes = "yes"
>
>     no = "no"
>
> ...
>
> for table in list_of_tables # type list(dict)
>
>     task_1 = BashOperator(
>
>         task_id='task_1_%s' % table["conf1"],
>
>         bash_command='bash script1.sh %s' % table["conf1"],
>
>         dag=dag)
>
>
>
>     if table["branch_flag"] == BranchFlags.yes:
>
>         consolidate = BashOperator(
>
>             task_id='task_3_%s' % table["conf2"],
>
>             bash_command='python %s/consolidate_parquet.py %s' % table["conf2"],
>
>             dag=dag)
>
>
>
>     task_3 = BashOperator(
>
>         task_id='task_3_%s' % table["conf3"],
>
>         bash_command='bash script3.sh %s' % table["conf3"],
>
>         dag=dag)
>
>
>
>     task_1 >> task_3
>
>     if table["branch_flag"] == BranchFlags.yes:
>
>         task_1 >> task_2
>
> Even though the longer parts of the graph run fine, the lone branch is not
> being run for the one sequence / pipeline that was supposed to branch. When
> viewing the logs for the task, I see
>
> *** Task instance did not exist in the DB
>
> This is weird to me, since ostensibly the scheduler DB sees the task since
> it does appear in the web UI graph. Not sure what is going on here and
> adding other changes to the dag .py file do show up in the graph and are
> executed by the scheduler when running the graph. And attempting to view
> the tasks Task Instance Details throws error
>
> Task [dagname.task_3_qwerty] doesn't seem to exist at the moment
>
> Running airflow resetdb (as I've seen in other posts) does nothing for
> the problem.
>
> Note that the intention is that the short branch runs concurrently with
> the longer branch (not as an either or choice).
>
> Anyone know why this would be happening or have some debugging tips?
>
>
> This electronic message is intended only for the named
> recipient, and may contain information that is confidential or
> privileged. If you are not the intended recipient, you are
> hereby notified that any disclosure, copying, distribution or
> use of the contents of this message is strictly prohibited. If
> you have received this message in error or are not the named
> recipient, please notify us immediately by contacting the
> sender at the electronic mail address noted above, and delete
> and destroy all copies of this message. Thank you.
>
>
>
>
> ==============================================================================
> Please access the attached hyperlink for an important electronic
> communications disclaimer:
> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>
> ==============================================================================
>
>
> This electronic message is intended only for the named
> recipient, and may contain information that is confidential or
> privileged. If you are not the intended recipient, you are
> hereby notified that any disclosure, copying, distribution or
> use of the contents of this message is strictly prohibited. If
> you have received this message in error or are not the named
> recipient, please notify us immediately by contacting the
> sender at the electronic mail address noted above, and delete
> and destroy all copies of this message. Thank you.
>
>
>
>
> ==============================================================================
> Please access the attached hyperlink for an important electronic
> communications disclaimer:
> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>
> ==============================================================================
>
>
> This electronic message is intended only for the named
> recipient, and may contain information that is confidential or
> privileged. If you are not the intended recipient, you are
> hereby notified that any disclosure, copying, distribution or
> use of the contents of this message is strictly prohibited. If
> you have received this message in error or are not the named
> recipient, please notify us immediately by contacting the
> sender at the electronic mail address noted above, and delete
> and destroy all copies of this message. Thank you.
>
>
>
>
> ==============================================================================
> Please access the attached hyperlink for an important electronic
> communications disclaimer:
> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>
> ==============================================================================
>

-- 
This electronic message is intended only for the named 
recipient, and may 
contain information that is confidential or 
privileged. If you are not the 
intended recipient, you are 
hereby notified that any disclosure, copying, 
distribution or 
use of the contents of this message is strictly 
prohibited. If 
you have received this message in error or are not the 
named
recipient, please notify us immediately by contacting the 
sender at 
the electronic mail address noted above, and delete 
and destroy all copies 
of this message. Thank you.

RE: Can airflow dags have branches that both run, but do not converge?

Posted by "Shaw, Damian P. " <da...@credit-suisse.com>.
I am able to create your example DAG with dummy operators and it runs fine:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperator

dag = DAG('test_branches',
          schedule_interval='@once',
          start_date=days_ago(2))

with dag:
    root = DummyOperator(task_id='root')

    # Pipeline 1
    pipe1 = DummyOperator(task_id='pipe1')
    pipe1.set_upstream(root)

    pipe1a = DummyOperator(task_id='pipe1a')
    pipe1a.set_upstream(pipe1)
    pipe1b = DummyOperator(task_id='pipe1b')
    pipe1b.set_upstream(pipe1)

    # Pipeline 2
    pipe2 = DummyOperator(task_id='pipe2')
    pipe2.set_upstream(root)

    pipe2a = DummyOperator(task_id='pipe2a')
    pipe2a.set_upstream(pipe2)


Does this DAG run fine for you with the different branches running in parallel? If not maybe there’s some configuration issue in your Airflow. Otherwise there’s some logic problem in your DAG creation.

Damian



From: Reed Villanueva <rv...@ucera.org>
Sent: Tuesday, January 28, 2020 15:38
To: users@airflow.apache.org
Subject: Re: Can airflow dags have branches that both run, but do not converge?

I expect it the branch to be made at dag creation time and not run time.
Ie. my dag looks like...
[image.png]
(which is static and is what I want to see) and for some reason, the dag only seems to run the purple task when I have another downstream task that both blue and purple tasks in that "pipeline" both connect to. (Yet the split "pipelines" for dataset 1 and 2 can both run concurrently without having to converge back together downstream). Note that I can use conditional logic to put the purple tasks before or after the blue task in pipeline 2, but oddly when I make the path split, it does not run.


On Tue, Jan 28, 2020 at 10:11 AM Shaw, Damian P. <da...@credit-suisse.com>> wrote:
This is what I mean by dynamic is this in your code:


    if table["branch_flag"] == BranchFlags.yes:

        task_1 >> task_2

This will be run at DAG creation time not a DAG run time, and therefore your “branch” will not work as you expect.

To get conditional branches you have to use the Branch Operators (either the BranchPythonOperator or sub-class the BaseBranchOperator):
https://airflow.apache.org/docs/stable/concepts.html?highlight=branch#branching
https://www.astronomer.io/guides/airflow-branch-operator/


Damian

From: Reed Villanueva <rv...@ucera.org>>
Sent: Tuesday, January 28, 2020 14:54
To: users@airflow.apache.org<ma...@airflow.apache.org>
Subject: Re: Can airflow dags have branches that both run, but do not converge?

I am not trying to dynamically branch while the dag is running if that's what you're asking.
The dag is intended to be static.
In the code given, the list_of_tables variable is a list of dicts of configs for different datasets, some of which use tasks that could be done concurrently (eg. write to local drive and write to remote DB) hence the branching.

On Tue, Jan 28, 2020 at 6:51 AM Shaw, Damian P. <da...@credit-suisse.com>> wrote:
Hi Reed,

Your DAG creation shouldn’t be dynamically changing frequently. The DAG itself in Airflow is fairly static, if in your DAG definition file you dynamically change your DAG based on conditions Airflow is simply going to think the tasks you are no longer defining don’t exist anymore. That’s what Airflow is  telling you, that you are no longer defining that task so it can’t find it anymore.



Damian

From: Reed Villanueva <rv...@ucera.org>>
Sent: Monday, January 27, 2020 17:50
To: users@airflow.apache.org<ma...@airflow.apache.org>
Subject: Can airflow dags have branches that both run, but do not converge?

...because these seems to not be that case.


I have an airflow graph with a conditional branch defined like

class BranchFlags(Enum):

    yes = "yes"

    no = "no"

...

for table in list_of_tables # type list(dict)

    task_1 = BashOperator(

        task_id='task_1_%s' % table["conf1"],

        bash_command='bash script1.sh %s' % table["conf1"],

        dag=dag)



    if table["branch_flag"] == BranchFlags.yes:

        consolidate = BashOperator(

            task_id='task_3_%s' % table["conf2"],

            bash_command='python %s/consolidate_parquet.py %s' % table["conf2"],

            dag=dag)



    task_3 = BashOperator(

        task_id='task_3_%s' % table["conf3"],

        bash_command='bash script3.sh %s' % table["conf3"],

        dag=dag)



    task_1 >> task_3

    if table["branch_flag"] == BranchFlags.yes:

        task_1 >> task_2

Even though the longer parts of the graph run fine, the lone branch is not being run for the one sequence / pipeline that was supposed to branch. When viewing the logs for the task, I see

*** Task instance did not exist in the DB

This is weird to me, since ostensibly the scheduler DB sees the task since it does appear in the web UI graph. Not sure what is going on here and adding other changes to the dag .py file do show up in the graph and are executed by the scheduler when running the graph. And attempting to view the tasks Task Instance Details throws error

Task [dagname.task_3_qwerty] doesn't seem to exist at the moment

Running airflow resetdb (as I've seen in other posts) does nothing for the problem.

Note that the intention is that the short branch runs concurrently with the longer branch (not as an either or choice).

Anyone know why this would be happening or have some debugging tips?

This electronic message is intended only for the named
recipient, and may contain information that is confidential or
privileged. If you are not the intended recipient, you are
hereby notified that any disclosure, copying, distribution or
use of the contents of this message is strictly prohibited. If
you have received this message in error or are not the named
recipient, please notify us immediately by contacting the
sender at the electronic mail address noted above, and delete
and destroy all copies of this message. Thank you.

==============================================================================
Please access the attached hyperlink for an important electronic communications disclaimer:
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
==============================================================================

This electronic message is intended only for the named
recipient, and may contain information that is confidential or
privileged. If you are not the intended recipient, you are
hereby notified that any disclosure, copying, distribution or
use of the contents of this message is strictly prohibited. If
you have received this message in error or are not the named
recipient, please notify us immediately by contacting the
sender at the electronic mail address noted above, and delete
and destroy all copies of this message. Thank you.


==============================================================================
Please access the attached hyperlink for an important electronic communications disclaimer:
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
==============================================================================

This electronic message is intended only for the named
recipient, and may contain information that is confidential or
privileged. If you are not the intended recipient, you are
hereby notified that any disclosure, copying, distribution or
use of the contents of this message is strictly prohibited. If
you have received this message in error or are not the named
recipient, please notify us immediately by contacting the
sender at the electronic mail address noted above, and delete
and destroy all copies of this message. Thank you.



=============================================================================== 
Please access the attached hyperlink for an important electronic communications disclaimer: 
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html 
=============================================================================== 

Re: Can airflow dags have branches that both run, but do not converge?

Posted by Reed Villanueva <rv...@ucera.org>.
I expect it the branch to be made at dag creation time and not run time.
Ie. my dag looks like...
[image: image.png]
(which is static and is what I want to see) and for some reason, the dag
only seems to run the purple task when I have another downstream task that
both blue and purple tasks in that "pipeline" both connect to. (Yet the
split "pipelines" for dataset 1 and 2 can both run concurrently without
having to converge back together downstream). Note that I can use
conditional logic to put the purple tasks before or after the blue task in
pipeline 2, but oddly when I make the path split, it does not run.


On Tue, Jan 28, 2020 at 10:11 AM Shaw, Damian P. <
damian.shaw.2@credit-suisse.com> wrote:

> This is what I mean by dynamic is this in your code:
>
>
>
>     if table["branch_flag"] == BranchFlags.yes:
>
>         task_1 >> task_2
>
>
>
> This will be run at DAG creation time not a DAG run time, and therefore
> your “branch” will not work as you expect.
>
>
>
> To get conditional branches you have to use the Branch Operators (either
> the BranchPythonOperator or sub-class the BaseBranchOperator):
>
>
> https://airflow.apache.org/docs/stable/concepts.html?highlight=branch#branching
>
> https://www.astronomer.io/guides/airflow-branch-operator/
>
>
>
>
>
> Damian
>
>
>
> *From:* Reed Villanueva <rv...@ucera.org>
> *Sent:* Tuesday, January 28, 2020 14:54
> *To:* users@airflow.apache.org
> *Subject:* Re: Can airflow dags have branches that both run, but do not
> converge?
>
>
>
> I am not trying to dynamically branch while the dag is running if that's
> what you're asking.
>
> The dag is intended to be static.
>
> In the code given, the list_of_tables variable is a list of dicts of
> configs for different datasets, some of which use tasks that could be done
> concurrently (eg. write to local drive and write to remote DB) hence the
> branching.
>
>
>
> On Tue, Jan 28, 2020 at 6:51 AM Shaw, Damian P. <
> damian.shaw.2@credit-suisse.com> wrote:
>
> Hi Reed,
>
>
>
> Your DAG creation shouldn’t be dynamically changing frequently. The DAG
> itself in Airflow is fairly static, if in your DAG definition file you
> dynamically change your DAG based on conditions Airflow is simply going to
> think the tasks you are no longer defining don’t exist anymore. That’s what
> Airflow is  telling you, that you are no longer defining that task so it
> can’t find it anymore.
>
>
>
>
>
>
>
> Damian
>
>
>
> *From:* Reed Villanueva <rv...@ucera.org>
> *Sent:* Monday, January 27, 2020 17:50
> *To:* users@airflow.apache.org
> *Subject:* Can airflow dags have branches that both run, but do not
> converge?
>
>
>
> ...because these seems to not be that case.
>
>
>
> I have an airflow graph with a conditional branch defined *like*
>
> class BranchFlags(Enum):
>
>     yes = "yes"
>
>     no = "no"
>
> ...
>
> for table in list_of_tables # type list(dict)
>
>     task_1 = BashOperator(
>
>         task_id='task_1_%s' % table["conf1"],
>
>         bash_command='bash script1.sh %s' % table["conf1"],
>
>         dag=dag)
>
>
>
>     if table["branch_flag"] == BranchFlags.yes:
>
>         consolidate = BashOperator(
>
>             task_id='task_3_%s' % table["conf2"],
>
>             bash_command='python %s/consolidate_parquet.py %s' % table["conf2"],
>
>             dag=dag)
>
>
>
>     task_3 = BashOperator(
>
>         task_id='task_3_%s' % table["conf3"],
>
>         bash_command='bash script3.sh %s' % table["conf3"],
>
>         dag=dag)
>
>
>
>     task_1 >> task_3
>
>     if table["branch_flag"] == BranchFlags.yes:
>
>         task_1 >> task_2
>
> Even though the longer parts of the graph run fine, the lone branch is not
> being run for the one sequence / pipeline that was supposed to branch. When
> viewing the logs for the task, I see
>
> *** Task instance did not exist in the DB
>
> This is weird to me, since ostensibly the scheduler DB sees the task since
> it does appear in the web UI graph. Not sure what is going on here and
> adding other changes to the dag .py file do show up in the graph and are
> executed by the scheduler when running the graph. And attempting to view
> the tasks Task Instance Details throws error
>
> Task [dagname.task_3_qwerty] doesn't seem to exist at the moment
>
> Running airflow resetdb (as I've seen in other posts) does nothing for
> the problem.
>
> Note that the intention is that the short branch runs concurrently with
> the longer branch (not as an either or choice).
>
> Anyone know why this would be happening or have some debugging tips?
>
>
> This electronic message is intended only for the named
> recipient, and may contain information that is confidential or
> privileged. If you are not the intended recipient, you are
> hereby notified that any disclosure, copying, distribution or
> use of the contents of this message is strictly prohibited. If
> you have received this message in error or are not the named
> recipient, please notify us immediately by contacting the
> sender at the electronic mail address noted above, and delete
> and destroy all copies of this message. Thank you.
>
>
>
>
> ==============================================================================
> Please access the attached hyperlink for an important electronic
> communications disclaimer:
> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>
> ==============================================================================
>
>
> This electronic message is intended only for the named
> recipient, and may contain information that is confidential or
> privileged. If you are not the intended recipient, you are
> hereby notified that any disclosure, copying, distribution or
> use of the contents of this message is strictly prohibited. If
> you have received this message in error or are not the named
> recipient, please notify us immediately by contacting the
> sender at the electronic mail address noted above, and delete
> and destroy all copies of this message. Thank you.
>
>
>
>
> ==============================================================================
> Please access the attached hyperlink for an important electronic
> communications disclaimer:
> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>
> ==============================================================================
>

-- 
This electronic message is intended only for the named 
recipient, and may 
contain information that is confidential or 
privileged. If you are not the 
intended recipient, you are 
hereby notified that any disclosure, copying, 
distribution or 
use of the contents of this message is strictly 
prohibited. If 
you have received this message in error or are not the 
named
recipient, please notify us immediately by contacting the 
sender at 
the electronic mail address noted above, and delete 
and destroy all copies 
of this message. Thank you.

RE: Can airflow dags have branches that both run, but do not converge?

Posted by "Shaw, Damian P. " <da...@credit-suisse.com>.
This is what I mean by dynamic is this in your code:


    if table["branch_flag"] == BranchFlags.yes:

        task_1 >> task_2

This will be run at DAG creation time not a DAG run time, and therefore your “branch” will not work as you expect.

To get conditional branches you have to use the Branch Operators (either the BranchPythonOperator or sub-class the BaseBranchOperator):
https://airflow.apache.org/docs/stable/concepts.html?highlight=branch#branching
https://www.astronomer.io/guides/airflow-branch-operator/


Damian

From: Reed Villanueva <rv...@ucera.org>
Sent: Tuesday, January 28, 2020 14:54
To: users@airflow.apache.org
Subject: Re: Can airflow dags have branches that both run, but do not converge?

I am not trying to dynamically branch while the dag is running if that's what you're asking.
The dag is intended to be static.
In the code given, the list_of_tables variable is a list of dicts of configs for different datasets, some of which use tasks that could be done concurrently (eg. write to local drive and write to remote DB) hence the branching.

On Tue, Jan 28, 2020 at 6:51 AM Shaw, Damian P. <da...@credit-suisse.com>> wrote:
Hi Reed,

Your DAG creation shouldn’t be dynamically changing frequently. The DAG itself in Airflow is fairly static, if in your DAG definition file you dynamically change your DAG based on conditions Airflow is simply going to think the tasks you are no longer defining don’t exist anymore. That’s what Airflow is  telling you, that you are no longer defining that task so it can’t find it anymore.



Damian

From: Reed Villanueva <rv...@ucera.org>>
Sent: Monday, January 27, 2020 17:50
To: users@airflow.apache.org<ma...@airflow.apache.org>
Subject: Can airflow dags have branches that both run, but do not converge?

...because these seems to not be that case.


I have an airflow graph with a conditional branch defined like

class BranchFlags(Enum):

    yes = "yes"

    no = "no"

...

for table in list_of_tables # type list(dict)

    task_1 = BashOperator(

        task_id='task_1_%s' % table["conf1"],

        bash_command='bash script1.sh %s' % table["conf1"],

        dag=dag)



    if table["branch_flag"] == BranchFlags.yes:

        consolidate = BashOperator(

            task_id='task_3_%s' % table["conf2"],

            bash_command='python %s/consolidate_parquet.py %s' % table["conf2"],

            dag=dag)



    task_3 = BashOperator(

        task_id='task_3_%s' % table["conf3"],

        bash_command='bash script3.sh %s' % table["conf3"],

        dag=dag)



    task_1 >> task_3

    if table["branch_flag"] == BranchFlags.yes:

        task_1 >> task_2

Even though the longer parts of the graph run fine, the lone branch is not being run for the one sequence / pipeline that was supposed to branch. When viewing the logs for the task, I see

*** Task instance did not exist in the DB

This is weird to me, since ostensibly the scheduler DB sees the task since it does appear in the web UI graph. Not sure what is going on here and adding other changes to the dag .py file do show up in the graph and are executed by the scheduler when running the graph. And attempting to view the tasks Task Instance Details throws error

Task [dagname.task_3_qwerty] doesn't seem to exist at the moment

Running airflow resetdb (as I've seen in other posts) does nothing for the problem.

Note that the intention is that the short branch runs concurrently with the longer branch (not as an either or choice).

Anyone know why this would be happening or have some debugging tips?

This electronic message is intended only for the named
recipient, and may contain information that is confidential or
privileged. If you are not the intended recipient, you are
hereby notified that any disclosure, copying, distribution or
use of the contents of this message is strictly prohibited. If
you have received this message in error or are not the named
recipient, please notify us immediately by contacting the
sender at the electronic mail address noted above, and delete
and destroy all copies of this message. Thank you.


==============================================================================
Please access the attached hyperlink for an important electronic communications disclaimer:
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
==============================================================================

This electronic message is intended only for the named
recipient, and may contain information that is confidential or
privileged. If you are not the intended recipient, you are
hereby notified that any disclosure, copying, distribution or
use of the contents of this message is strictly prohibited. If
you have received this message in error or are not the named
recipient, please notify us immediately by contacting the
sender at the electronic mail address noted above, and delete
and destroy all copies of this message. Thank you.



=============================================================================== 
Please access the attached hyperlink for an important electronic communications disclaimer: 
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html 
=============================================================================== 

Re: Can airflow dags have branches that both run, but do not converge?

Posted by Reed Villanueva <rv...@ucera.org>.
I am not trying to dynamically branch while the dag is running if that's
what you're asking.
The dag is intended to be static.
In the code given, the list_of_tables variable is a list of dicts of
configs for different datasets, some of which use tasks that could be done
concurrently (eg. write to local drive and write to remote DB) hence the
branching.

On Tue, Jan 28, 2020 at 6:51 AM Shaw, Damian P. <
damian.shaw.2@credit-suisse.com> wrote:

> Hi Reed,
>
>
>
> Your DAG creation shouldn’t be dynamically changing frequently. The DAG
> itself in Airflow is fairly static, if in your DAG definition file you
> dynamically change your DAG based on conditions Airflow is simply going to
> think the tasks you are no longer defining don’t exist anymore. That’s what
> Airflow is  telling you, that you are no longer defining that task so it
> can’t find it anymore.
>
>
>
> To get conditional branches you have to use the Branch Operators (either
> the BranchPythonOperator or sub-class the BaseBranchOperator):
>
>
> https://airflow.apache.org/docs/stable/concepts.html?highlight=branch#branching
>
> https://www.astronomer.io/guides/airflow-branch-operator/
>
>
>
>
>
> Damian
>
>
>
> *From:* Reed Villanueva <rv...@ucera.org>
> *Sent:* Monday, January 27, 2020 17:50
> *To:* users@airflow.apache.org
> *Subject:* Can airflow dags have branches that both run, but do not
> converge?
>
>
>
> ...because these seems to not be that case.
>
>
>
> I have an airflow graph with a conditional branch defined *like*
>
> class BranchFlags(Enum):
>
>     yes = "yes"
>
>     no = "no"
>
> ...
>
> for table in list_of_tables # type list(dict)
>
>     task_1 = BashOperator(
>
>         task_id='task_1_%s' % table["conf1"],
>
>         bash_command='bash script1.sh %s' % table["conf1"],
>
>         dag=dag)
>
>
>
>     if table["branch_flag"] == BranchFlags.yes:
>
>         consolidate = BashOperator(
>
>             task_id='task_3_%s' % table["conf2"],
>
>             bash_command='python %s/consolidate_parquet.py %s' % table["conf2"],
>
>             dag=dag)
>
>
>
>     task_3 = BashOperator(
>
>         task_id='task_3_%s' % table["conf3"],
>
>         bash_command='bash script3.sh %s' % table["conf3"],
>
>         dag=dag)
>
>
>
>     task_1 >> task_3
>
>     if table["branch_flag"] == BranchFlags.yes:
>
>         task_1 >> task_2
>
> Even though the longer parts of the graph run fine, the lone branch is not
> being run for the one sequence / pipeline that was supposed to branch. When
> viewing the logs for the task, I see
>
> *** Task instance did not exist in the DB
>
> This is weird to me, since ostensibly the scheduler DB sees the task since
> it does appear in the web UI graph. Not sure what is going on here and
> adding other changes to the dag .py file do show up in the graph and are
> executed by the scheduler when running the graph. And attempting to view
> the tasks Task Instance Details throws error
>
> Task [dagname.task_3_qwerty] doesn't seem to exist at the moment
>
> Running airflow resetdb (as I've seen in other posts) does nothing for
> the problem.
>
> Note that the intention is that the short branch runs concurrently with
> the longer branch (not as an either or choice).
>
> Anyone know why this would be happening or have some debugging tips?
>
>
> This electronic message is intended only for the named
> recipient, and may contain information that is confidential or
> privileged. If you are not the intended recipient, you are
> hereby notified that any disclosure, copying, distribution or
> use of the contents of this message is strictly prohibited. If
> you have received this message in error or are not the named
> recipient, please notify us immediately by contacting the
> sender at the electronic mail address noted above, and delete
> and destroy all copies of this message. Thank you.
>
>
>
>
> ==============================================================================
> Please access the attached hyperlink for an important electronic
> communications disclaimer:
> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>
> ==============================================================================
>

-- 
This electronic message is intended only for the named 
recipient, and may 
contain information that is confidential or 
privileged. If you are not the 
intended recipient, you are 
hereby notified that any disclosure, copying, 
distribution or 
use of the contents of this message is strictly 
prohibited. If 
you have received this message in error or are not the 
named
recipient, please notify us immediately by contacting the 
sender at 
the electronic mail address noted above, and delete 
and destroy all copies 
of this message. Thank you.

RE: Can airflow dags have branches that both run, but do not converge?

Posted by "Shaw, Damian P. " <da...@credit-suisse.com>.
Hi Reed,

Your DAG creation shouldn’t be dynamically changing frequently. The DAG itself in Airflow is fairly static, if in your DAG definition file you dynamically change your DAG based on conditions Airflow is simply going to think the tasks you are no longer defining don’t exist anymore. That’s what Airflow is  telling you, that you are no longer defining that task so it can’t find it anymore.

To get conditional branches you have to use the Branch Operators (either the BranchPythonOperator or sub-class the BaseBranchOperator):
https://airflow.apache.org/docs/stable/concepts.html?highlight=branch#branching
https://www.astronomer.io/guides/airflow-branch-operator/


Damian

From: Reed Villanueva <rv...@ucera.org>
Sent: Monday, January 27, 2020 17:50
To: users@airflow.apache.org
Subject: Can airflow dags have branches that both run, but do not converge?

...because these seems to not be that case.


I have an airflow graph with a conditional branch defined like

class BranchFlags(Enum):

    yes = "yes"

    no = "no"

...

for table in list_of_tables # type list(dict)

    task_1 = BashOperator(

        task_id='task_1_%s' % table["conf1"],

        bash_command='bash script1.sh %s' % table["conf1"],

        dag=dag)



    if table["branch_flag"] == BranchFlags.yes:

        consolidate = BashOperator(

            task_id='task_3_%s' % table["conf2"],

            bash_command='python %s/consolidate_parquet.py %s' % table["conf2"],

            dag=dag)



    task_3 = BashOperator(

        task_id='task_3_%s' % table["conf3"],

        bash_command='bash script3.sh %s' % table["conf3"],

        dag=dag)



    task_1 >> task_3

    if table["branch_flag"] == BranchFlags.yes:

        task_1 >> task_2

Even though the longer parts of the graph run fine, the lone branch is not being run for the one sequence / pipeline that was supposed to branch. When viewing the logs for the task, I see

*** Task instance did not exist in the DB

This is weird to me, since ostensibly the scheduler DB sees the task since it does appear in the web UI graph. Not sure what is going on here and adding other changes to the dag .py file do show up in the graph and are executed by the scheduler when running the graph. And attempting to view the tasks Task Instance Details throws error

Task [dagname.task_3_qwerty] doesn't seem to exist at the moment

Running airflow resetdb (as I've seen in other posts) does nothing for the problem.

Note that the intention is that the short branch runs concurrently with the longer branch (not as an either or choice).

Anyone know why this would be happening or have some debugging tips?

This electronic message is intended only for the named
recipient, and may contain information that is confidential or
privileged. If you are not the intended recipient, you are
hereby notified that any disclosure, copying, distribution or
use of the contents of this message is strictly prohibited. If
you have received this message in error or are not the named
recipient, please notify us immediately by contacting the
sender at the electronic mail address noted above, and delete
and destroy all copies of this message. Thank you.



=============================================================================== 
Please access the attached hyperlink for an important electronic communications disclaimer: 
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html 
===============================================================================