You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@airflow.apache.org by Avi Levi <av...@theneura.com> on 2021/06/03 07:20:39 UTC

Create dependencies between emr steps

Hi,
How can I create dependencies between emr steps ? Do I need to create a
step watcher between each one like below (option 1) or I don't need the
step_watcher and they can be dependent directly (option 2) ? meaning
something like this:

step1 = EmrAddStepsOperator(
task_id="step1",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
key='return_value') }}",
aws_conn_id="aws_default",
steps=STEP1,
dag=dag,
)

step2 = EmrAddStepsOperator(
task_id="step2",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
key='return_value') }}",
aws_conn_id="aws_default",
steps=STEP2,
dag=dag,
)

step1_watcher = EmrStepSensor(
task_id="step_1watcher",
job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id="aws_default",
dag=dag,
)

step2_watcher = EmrStepSensor(
task_id="step_2watcher",
job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id="aws_default",
dag=dag,
)

option 1:  start_pipeline >> create_emr_cluster >> step1 >> step1_watcher
>> step2 >> step2_warcher >> terminate

option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2 >>
step2_warcher >> terminate

Re: Create dependencies between emr steps

Posted by Daniel Standish <dp...@gmail.com>.
Well for the first one something like this:

from airflow.providers.amazon.aws.operators.emr_add_steps import
EmrAddStepsOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor

STEP1 = None
dag = None


class MyEmrAddStepsOperator(EmrAddStepsOperator):
    def __init__(self, create_cluster_task_id, **kwargs):
        self.create_cluster_task_id = create_cluster_task_id
        super().__init__(**kwargs)

    @property
    def step_sensor_task(self):
        return EmrStepSensor(
            task_id=f"{self.task_id}.sensor",
            job_flow_id=f"{{{{
task_instance.xcom_pull({self.create_cluster_task_id},
key='return_value') }}}}",
            step_id=f"{{{{ task_instance.xcom_pull({self.task_id},
key='return_value')[0] }}}}",
            aws_conn_id=self.aws_conn_id,
            dag=dag,
        )


step_1_task = MyEmrAddStepsOperator(
    task_id="step1",
    job_flow_id="{{
task_instance.xcom_pull(task_ids='create_emr_cluster',
key='return_value') }}",
    aws_conn_id="aws_default",
    steps=STEP1,
    dag=dag,
    create_cluster_task_id='create_cluster',
)

step_1_sensor = step_1_task.step_sensor_task

step_1_task >> step_1_sensor

you could even contain them within a task group (google airflow task
groups) to make it look like one task in the UI.  that's _mostly_ cosmetic
but with task groups you can set dependencies between them which in your
case might be convenient.

on the topic of combining, one way you could do it is copy the polling /
waiting logic from the sensor and stick it in a subclass like this:


class MyOtherEmrAddStepsOperator(EmrAddStepsOperator):
    def wait_step(self):
        """put wait logic here"""

    def execute(self, context):
        super().execute(context)
        self.wait_step()













On Thu, Jun 3, 2021 at 12:10 PM Avi Levi <av...@theneura.com> wrote:

> Thanks Daniel.
> I am pretty new with Airflow, your suggestions sound great but can you add
> a reference to some implementation examples ?
>
> Best
> Avi
>
> On Thu, Jun 3, 2021 at 11:46 AM Daniel Standish <dp...@gmail.com>
> wrote:
>
>> So do i have to go on option 1 ? meaning add step_checker after each step
>>> to verify completion
>>
>>
>> One thing you can do to make this less onerous is subclass
>> EmrAddStepsOperator  and add a property that produces the sensor it needs.
>>
>> So you would have something like
>>
>> task1  = EmrAddStepsOperator(...)
>> task1_sensor = task1.sensor_task
>> task1 >> task1_sensor
>>
>> Another option you have is you can create EmrAddStepsSyncronousOperator,
>> and do the waiting in the same task.
>>
>>
>> On Thu, Jun 3, 2021 at 7:46 AM Avi Levi <av...@theneura.com> wrote:
>>
>>> Thanks Tom,
>>> But this way there is no dependency between the steps right ? here you
>>> are just verifying that they are both completed. However, I do want step2
>>> to be dependent on step1 completion successfully .
>>> So do i have to go on option 1 ? meaning add step_checker after each
>>> step to verify completion
>>> option 1:  start_pipeline >> create_emr_cluster >> step1 >>
>>> step1_watcher >> step2 >> step2_warcher >> terminate
>>>
>>> option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2 >>
>>> step2_warcher >> terminate
>>>
>>> On Thu, Jun 3, 2021 at 5:20 AM Tom Korrison <to...@concirrus.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> I only have one add_step task but a step_sensor for each step added.
>>>>
>>>>
>>>>
>>>> e.g.
>>>>
>>>>
>>>>
>>>> start_daily_pipeline = DummyOperator(
>>>>     task_id="start_daily_pipeline",
>>>>     dag=dag
>>>> )
>>>>
>>>> cluster_creator = EmrCreateJobFlowOperator(
>>>>     task_id="create_job_flow",
>>>>     aws_conn_id="aws_role_default",
>>>>     emr_conn_id="emr_default",
>>>>     job_flow_overrides=JOB_FLOW_OVERRIDES,
>>>>     dag=dag,
>>>> )
>>>>
>>>> step_adder = EmrAddStepsOperator(
>>>>     task_id="add_steps",
>>>>     job_flow_id="{{
>>>> task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}"
>>>> ,
>>>>     aws_conn_id="aws_role_default",
>>>>     steps=SPARK_STEPS,
>>>>     dag=dag,
>>>> )
>>>>
>>>> step1_checker = EmrStepSensor(
>>>>     task_id="watch_step_1",
>>>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>>>> key='return_value') }}",
>>>>     step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
>>>> key='return_value')[0] }}",
>>>>     aws_conn_id="aws_role_default",
>>>>     dag=dag,
>>>> )
>>>>
>>>> step2_checker = EmrStepSensor(
>>>>     task_id="watch_step_2",
>>>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>>>> key='return_value') }}",
>>>>     step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
>>>> key='return_value')[1] }}",
>>>>     aws_conn_id="aws_role_default",
>>>>     dag=dag,
>>>> )
>>>>
>>>> job_flow_checker = EmrJobFlowSensor(
>>>>     task_id="watch_job_flow",
>>>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>>>> key='return_value') }}",
>>>>     step_id="{{ task_instance.xcom_pull('add_steps',
>>>> key='return_value')[0] }}",
>>>>     aws_conn_id="aws_role_default",
>>>>     dag=dag,
>>>> )
>>>>
>>>> cluster_remover = EmrTerminateJobFlowOperator(
>>>>     task_id="remove_cluster",
>>>>     job_flow_id="{{
>>>> task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}"
>>>> ,
>>>>     aws_conn_id="aws_role_default",
>>>>     dag=dag,
>>>> )
>>>>
>>>>
>>>> start_daily_pipeline >> cluster_creator >> step_adder
>>>> step_adder >> [step1_checker, step2_checker] >> job_flow_checker >>
>>>> cluster_remover
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *From:* Avi Levi <av...@theneura.com>
>>>> *Sent:* 03 June 2021 08:21
>>>> *To:* users@airflow.apache.org
>>>> *Subject:* Create dependencies between emr steps
>>>>
>>>>
>>>>
>>>> Hi,
>>>>
>>>> How can I create dependencies between emr steps ? Do I need to create a
>>>> step watcher between each one like below (option 1) or I don't need the
>>>> step_watcher and they can be dependent directly (option 2) ? meaning
>>>> something like this:
>>>>
>>>>
>>>>
>>>> step1 = EmrAddStepsOperator(
>>>> task_id="step1",
>>>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
>>>> key='return_value') }}",
>>>> aws_conn_id="aws_default",
>>>> steps=STEP1,
>>>> dag=dag,
>>>> )
>>>>
>>>>
>>>>
>>>> step2 = EmrAddStepsOperator(
>>>> task_id="step2",
>>>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
>>>> key='return_value') }}",
>>>> aws_conn_id="aws_default",
>>>> steps=STEP2,
>>>> dag=dag,
>>>> )
>>>>
>>>>
>>>>
>>>> step1_watcher = EmrStepSensor(
>>>> task_id="step_1watcher",
>>>> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
>>>> key='return_value') }}",
>>>> step_id="{{ task_instance.xcom_pull('add_steps',
>>>> key='return_value')[0] }}",
>>>> aws_conn_id="aws_default",
>>>> dag=dag,
>>>> )
>>>>
>>>>
>>>>
>>>> step2_watcher = EmrStepSensor(
>>>> task_id="step_2watcher",
>>>> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
>>>> key='return_value') }}",
>>>> step_id="{{ task_instance.xcom_pull('add_steps',
>>>> key='return_value')[0] }}",
>>>> aws_conn_id="aws_default",
>>>> dag=dag,
>>>> )
>>>>
>>>>
>>>>
>>>> option 1:  start_pipeline >> create_emr_cluster >> step1 >>
>>>> step1_watcher >> step2 >> step2_warcher >> terminate
>>>>
>>>>
>>>>
>>>> option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2 >>
>>>> step2_warcher >> terminate
>>>>
>>>>
>>>>
>>>

Re: Create dependencies between emr steps

Posted by Avi Levi <av...@theneura.com>.
Thanks Daniel.
I am pretty new with Airflow, your suggestions sound great but can you add
a reference to some implementation examples ?

Best
Avi

On Thu, Jun 3, 2021 at 11:46 AM Daniel Standish <dp...@gmail.com>
wrote:

> So do i have to go on option 1 ? meaning add step_checker after each step
>> to verify completion
>
>
> One thing you can do to make this less onerous is subclass
> EmrAddStepsOperator  and add a property that produces the sensor it needs.
>
> So you would have something like
>
> task1  = EmrAddStepsOperator(...)
> task1_sensor = task1.sensor_task
> task1 >> task1_sensor
>
> Another option you have is you can create EmrAddStepsSyncronousOperator,
> and do the waiting in the same task.
>
>
> On Thu, Jun 3, 2021 at 7:46 AM Avi Levi <av...@theneura.com> wrote:
>
>> Thanks Tom,
>> But this way there is no dependency between the steps right ? here you
>> are just verifying that they are both completed. However, I do want step2
>> to be dependent on step1 completion successfully .
>> So do i have to go on option 1 ? meaning add step_checker after each step
>> to verify completion
>> option 1:  start_pipeline >> create_emr_cluster >> step1 >>
>> step1_watcher >> step2 >> step2_warcher >> terminate
>>
>> option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2 >>
>> step2_warcher >> terminate
>>
>> On Thu, Jun 3, 2021 at 5:20 AM Tom Korrison <to...@concirrus.com>
>> wrote:
>>
>>> Hi,
>>>
>>>
>>>
>>> I only have one add_step task but a step_sensor for each step added.
>>>
>>>
>>>
>>> e.g.
>>>
>>>
>>>
>>> start_daily_pipeline = DummyOperator(
>>>     task_id="start_daily_pipeline",
>>>     dag=dag
>>> )
>>>
>>> cluster_creator = EmrCreateJobFlowOperator(
>>>     task_id="create_job_flow",
>>>     aws_conn_id="aws_role_default",
>>>     emr_conn_id="emr_default",
>>>     job_flow_overrides=JOB_FLOW_OVERRIDES,
>>>     dag=dag,
>>> )
>>>
>>> step_adder = EmrAddStepsOperator(
>>>     task_id="add_steps",
>>>     job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',
>>> key='return_value') }}",
>>>     aws_conn_id="aws_role_default",
>>>     steps=SPARK_STEPS,
>>>     dag=dag,
>>> )
>>>
>>> step1_checker = EmrStepSensor(
>>>     task_id="watch_step_1",
>>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>>> key='return_value') }}",
>>>     step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
>>> key='return_value')[0] }}",
>>>     aws_conn_id="aws_role_default",
>>>     dag=dag,
>>> )
>>>
>>> step2_checker = EmrStepSensor(
>>>     task_id="watch_step_2",
>>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>>> key='return_value') }}",
>>>     step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
>>> key='return_value')[1] }}",
>>>     aws_conn_id="aws_role_default",
>>>     dag=dag,
>>> )
>>>
>>> job_flow_checker = EmrJobFlowSensor(
>>>     task_id="watch_job_flow",
>>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>>> key='return_value') }}",
>>>     step_id="{{ task_instance.xcom_pull('add_steps',
>>> key='return_value')[0] }}",
>>>     aws_conn_id="aws_role_default",
>>>     dag=dag,
>>> )
>>>
>>> cluster_remover = EmrTerminateJobFlowOperator(
>>>     task_id="remove_cluster",
>>>     job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',
>>> key='return_value') }}",
>>>     aws_conn_id="aws_role_default",
>>>     dag=dag,
>>> )
>>>
>>>
>>> start_daily_pipeline >> cluster_creator >> step_adder
>>> step_adder >> [step1_checker, step2_checker] >> job_flow_checker >>
>>> cluster_remover
>>>
>>>
>>>
>>>
>>>
>>> *From:* Avi Levi <av...@theneura.com>
>>> *Sent:* 03 June 2021 08:21
>>> *To:* users@airflow.apache.org
>>> *Subject:* Create dependencies between emr steps
>>>
>>>
>>>
>>> Hi,
>>>
>>> How can I create dependencies between emr steps ? Do I need to create a
>>> step watcher between each one like below (option 1) or I don't need the
>>> step_watcher and they can be dependent directly (option 2) ? meaning
>>> something like this:
>>>
>>>
>>>
>>> step1 = EmrAddStepsOperator(
>>> task_id="step1",
>>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
>>> key='return_value') }}",
>>> aws_conn_id="aws_default",
>>> steps=STEP1,
>>> dag=dag,
>>> )
>>>
>>>
>>>
>>> step2 = EmrAddStepsOperator(
>>> task_id="step2",
>>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
>>> key='return_value') }}",
>>> aws_conn_id="aws_default",
>>> steps=STEP2,
>>> dag=dag,
>>> )
>>>
>>>
>>>
>>> step1_watcher = EmrStepSensor(
>>> task_id="step_1watcher",
>>> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
>>> key='return_value') }}",
>>> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0]
>>> }}",
>>> aws_conn_id="aws_default",
>>> dag=dag,
>>> )
>>>
>>>
>>>
>>> step2_watcher = EmrStepSensor(
>>> task_id="step_2watcher",
>>> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
>>> key='return_value') }}",
>>> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0]
>>> }}",
>>> aws_conn_id="aws_default",
>>> dag=dag,
>>> )
>>>
>>>
>>>
>>> option 1:  start_pipeline >> create_emr_cluster >> step1 >>
>>> step1_watcher >> step2 >> step2_warcher >> terminate
>>>
>>>
>>>
>>> option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2 >>
>>> step2_warcher >> terminate
>>>
>>>
>>>
>>

Re: Create dependencies between emr steps

Posted by Daniel Standish <dp...@gmail.com>.
>
> So do i have to go on option 1 ? meaning add step_checker after each step
> to verify completion


One thing you can do to make this less onerous is subclass
EmrAddStepsOperator  and add a property that produces the sensor it needs.

So you would have something like

task1  = EmrAddStepsOperator(...)
task1_sensor = task1.sensor_task
task1 >> task1_sensor

Another option you have is you can create EmrAddStepsSyncronousOperator,
and do the waiting in the same task.


On Thu, Jun 3, 2021 at 7:46 AM Avi Levi <av...@theneura.com> wrote:

> Thanks Tom,
> But this way there is no dependency between the steps right ? here you are
> just verifying that they are both completed. However, I do want step2 to be
> dependent on step1 completion successfully .
> So do i have to go on option 1 ? meaning add step_checker after each step
> to verify completion
> option 1:  start_pipeline >> create_emr_cluster >> step1 >> step1_watcher
> >> step2 >> step2_warcher >> terminate
>
> option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2 >>
> step2_warcher >> terminate
>
> On Thu, Jun 3, 2021 at 5:20 AM Tom Korrison <to...@concirrus.com>
> wrote:
>
>> Hi,
>>
>>
>>
>> I only have one add_step task but a step_sensor for each step added.
>>
>>
>>
>> e.g.
>>
>>
>>
>> start_daily_pipeline = DummyOperator(
>>     task_id="start_daily_pipeline",
>>     dag=dag
>> )
>>
>> cluster_creator = EmrCreateJobFlowOperator(
>>     task_id="create_job_flow",
>>     aws_conn_id="aws_role_default",
>>     emr_conn_id="emr_default",
>>     job_flow_overrides=JOB_FLOW_OVERRIDES,
>>     dag=dag,
>> )
>>
>> step_adder = EmrAddStepsOperator(
>>     task_id="add_steps",
>>     job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',
>> key='return_value') }}",
>>     aws_conn_id="aws_role_default",
>>     steps=SPARK_STEPS,
>>     dag=dag,
>> )
>>
>> step1_checker = EmrStepSensor(
>>     task_id="watch_step_1",
>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>> key='return_value') }}",
>>     step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
>> key='return_value')[0] }}",
>>     aws_conn_id="aws_role_default",
>>     dag=dag,
>> )
>>
>> step2_checker = EmrStepSensor(
>>     task_id="watch_step_2",
>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>> key='return_value') }}",
>>     step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
>> key='return_value')[1] }}",
>>     aws_conn_id="aws_role_default",
>>     dag=dag,
>> )
>>
>> job_flow_checker = EmrJobFlowSensor(
>>     task_id="watch_job_flow",
>>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
>> key='return_value') }}",
>>     step_id="{{ task_instance.xcom_pull('add_steps',
>> key='return_value')[0] }}",
>>     aws_conn_id="aws_role_default",
>>     dag=dag,
>> )
>>
>> cluster_remover = EmrTerminateJobFlowOperator(
>>     task_id="remove_cluster",
>>     job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',
>> key='return_value') }}",
>>     aws_conn_id="aws_role_default",
>>     dag=dag,
>> )
>>
>>
>> start_daily_pipeline >> cluster_creator >> step_adder
>> step_adder >> [step1_checker, step2_checker] >> job_flow_checker >>
>> cluster_remover
>>
>>
>>
>>
>>
>> *From:* Avi Levi <av...@theneura.com>
>> *Sent:* 03 June 2021 08:21
>> *To:* users@airflow.apache.org
>> *Subject:* Create dependencies between emr steps
>>
>>
>>
>> Hi,
>>
>> How can I create dependencies between emr steps ? Do I need to create a
>> step watcher between each one like below (option 1) or I don't need the
>> step_watcher and they can be dependent directly (option 2) ? meaning
>> something like this:
>>
>>
>>
>> step1 = EmrAddStepsOperator(
>> task_id="step1",
>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
>> key='return_value') }}",
>> aws_conn_id="aws_default",
>> steps=STEP1,
>> dag=dag,
>> )
>>
>>
>>
>> step2 = EmrAddStepsOperator(
>> task_id="step2",
>> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
>> key='return_value') }}",
>> aws_conn_id="aws_default",
>> steps=STEP2,
>> dag=dag,
>> )
>>
>>
>>
>> step1_watcher = EmrStepSensor(
>> task_id="step_1watcher",
>> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
>> key='return_value') }}",
>> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0]
>> }}",
>> aws_conn_id="aws_default",
>> dag=dag,
>> )
>>
>>
>>
>> step2_watcher = EmrStepSensor(
>> task_id="step_2watcher",
>> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
>> key='return_value') }}",
>> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0]
>> }}",
>> aws_conn_id="aws_default",
>> dag=dag,
>> )
>>
>>
>>
>> option 1:  start_pipeline >> create_emr_cluster >> step1 >>
>> step1_watcher >> step2 >> step2_warcher >> terminate
>>
>>
>>
>> option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2 >>
>> step2_warcher >> terminate
>>
>>
>>
>

Re: Create dependencies between emr steps

Posted by Avi Levi <av...@theneura.com>.
Thanks Tom,
But this way there is no dependency between the steps right ? here you are
just verifying that they are both completed. However, I do want step2 to be
dependent on step1 completion successfully .
So do i have to go on option 1 ? meaning add step_checker after each step
to verify completion
option 1:  start_pipeline >> create_emr_cluster >> step1 >> step1_watcher
>> step2 >> step2_warcher >> terminate

option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2 >>
step2_warcher >> terminate

On Thu, Jun 3, 2021 at 5:20 AM Tom Korrison <to...@concirrus.com>
wrote:

> Hi,
>
>
>
> I only have one add_step task but a step_sensor for each step added.
>
>
>
> e.g.
>
>
>
> start_daily_pipeline = DummyOperator(
>     task_id="start_daily_pipeline",
>     dag=dag
> )
>
> cluster_creator = EmrCreateJobFlowOperator(
>     task_id="create_job_flow",
>     aws_conn_id="aws_role_default",
>     emr_conn_id="emr_default",
>     job_flow_overrides=JOB_FLOW_OVERRIDES,
>     dag=dag,
> )
>
> step_adder = EmrAddStepsOperator(
>     task_id="add_steps",
>     job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',
> key='return_value') }}",
>     aws_conn_id="aws_role_default",
>     steps=SPARK_STEPS,
>     dag=dag,
> )
>
> step1_checker = EmrStepSensor(
>     task_id="watch_step_1",
>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
> key='return_value') }}",
>     step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
> key='return_value')[0] }}",
>     aws_conn_id="aws_role_default",
>     dag=dag,
> )
>
> step2_checker = EmrStepSensor(
>     task_id="watch_step_2",
>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
> key='return_value') }}",
>     step_id="{{ task_instance.xcom_pull(task_ids='add_steps',
> key='return_value')[1] }}",
>     aws_conn_id="aws_role_default",
>     dag=dag,
> )
>
> job_flow_checker = EmrJobFlowSensor(
>     task_id="watch_job_flow",
>     job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
> key='return_value') }}",
>     step_id="{{ task_instance.xcom_pull('add_steps',
> key='return_value')[0] }}",
>     aws_conn_id="aws_role_default",
>     dag=dag,
> )
>
> cluster_remover = EmrTerminateJobFlowOperator(
>     task_id="remove_cluster",
>     job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',
> key='return_value') }}",
>     aws_conn_id="aws_role_default",
>     dag=dag,
> )
>
>
> start_daily_pipeline >> cluster_creator >> step_adder
> step_adder >> [step1_checker, step2_checker] >> job_flow_checker >>
> cluster_remover
>
>
>
>
>
> *From:* Avi Levi <av...@theneura.com>
> *Sent:* 03 June 2021 08:21
> *To:* users@airflow.apache.org
> *Subject:* Create dependencies between emr steps
>
>
>
> Hi,
>
> How can I create dependencies between emr steps ? Do I need to create a
> step watcher between each one like below (option 1) or I don't need the
> step_watcher and they can be dependent directly (option 2) ? meaning
> something like this:
>
>
>
> step1 = EmrAddStepsOperator(
> task_id="step1",
> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
> key='return_value') }}",
> aws_conn_id="aws_default",
> steps=STEP1,
> dag=dag,
> )
>
>
>
> step2 = EmrAddStepsOperator(
> task_id="step2",
> job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster',
> key='return_value') }}",
> aws_conn_id="aws_default",
> steps=STEP2,
> dag=dag,
> )
>
>
>
> step1_watcher = EmrStepSensor(
> task_id="step_1watcher",
> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
> key='return_value') }}",
> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0]
> }}",
> aws_conn_id="aws_default",
> dag=dag,
> )
>
>
>
> step2_watcher = EmrStepSensor(
> task_id="step_2watcher",
> job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster',
> key='return_value') }}",
> step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0]
> }}",
> aws_conn_id="aws_default",
> dag=dag,
> )
>
>
>
> option 1:  start_pipeline >> create_emr_cluster >> step1 >> step1_watcher
> >> step2 >> step2_warcher >> terminate
>
>
>
> option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2 >>
> step2_warcher >> terminate
>
>
>

RE: Create dependencies between emr steps

Posted by Tom Korrison <to...@concirrus.com>.
Hi,

I only have one add_step task but a step_sensor for each step added.

e.g.

start_daily_pipeline = DummyOperator(
    task_id="start_daily_pipeline",
    dag=dag
)

cluster_creator = EmrCreateJobFlowOperator(
    task_id="create_job_flow",
    aws_conn_id="aws_role_default",
    emr_conn_id="emr_default",
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    dag=dag,
)

step_adder = EmrAddStepsOperator(
    task_id="add_steps",
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
    aws_conn_id="aws_role_default",
    steps=SPARK_STEPS,
    dag=dag,
)

step1_checker = EmrStepSensor(
    task_id="watch_step_1",
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
    aws_conn_id="aws_role_default",
    dag=dag,
)

step2_checker = EmrStepSensor(
    task_id="watch_step_2",
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[1] }}",
    aws_conn_id="aws_role_default",
    dag=dag,
)

job_flow_checker = EmrJobFlowSensor(
    task_id="watch_job_flow",
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
    aws_conn_id="aws_role_default",
    dag=dag,
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id="remove_cluster",
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
    aws_conn_id="aws_role_default",
    dag=dag,
)


start_daily_pipeline >> cluster_creator >> step_adder
step_adder >> [step1_checker, step2_checker] >> job_flow_checker >> cluster_remover


From: Avi Levi <av...@theneura.com>
Sent: 03 June 2021 08:21
To: users@airflow.apache.org
Subject: Create dependencies between emr steps

Hi,
How can I create dependencies between emr steps ? Do I need to create a step watcher between each one like below (option 1) or I don't need the step_watcher and they can be dependent directly (option 2) ? meaning something like this:

step1 = EmrAddStepsOperator(
task_id="step1",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
aws_conn_id="aws_default",
steps=STEP1,
dag=dag,
)

step2 = EmrAddStepsOperator(
task_id="step2",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
aws_conn_id="aws_default",
steps=STEP2,
dag=dag,
)

step1_watcher = EmrStepSensor(
task_id="step_1watcher",
job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id="aws_default",
dag=dag,
)

step2_watcher = EmrStepSensor(
task_id="step_2watcher",
job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id="aws_default",
dag=dag,
)

option 1:  start_pipeline >> create_emr_cluster >> step1 >> step1_watcher >> step2 >> step2_warcher >> terminate

option 2:  start_pipeline >> create_emr_cluster >> step1 >> step2 >> step2_warcher >> terminate