You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2016/04/28 23:14:12 UTC

[jira] [Commented] (AIRFLOW-19) How can I have an Operator B iterate over a list returned from upstream by Operator A?

    [ https://issues.apache.org/jira/browse/AIRFLOW-19?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263013#comment-15263013 ] 

Chris Riccomini commented on AIRFLOW-19:
----------------------------------------

Code from gist is:

{code}
t1 = DistCpSensor(
    task_id='sensor',
    lookback=2,
    distcp_dao=distcp_dao,
    source_conn=source_conn,
    parent_paths=['/tmp/testAirflowDistCp'],
    timeout=12*60*60,
    dag=dag)

def run_distcp_on_each(*args, **kwargs):
    ti = kwargs['ti']
    pprint(kwargs)
    to_process = ti.xcom_pull(task_ids=None, key='to_process')
    pprint(to_process)
    for work in to_process:
        t3 = DistCpOperator(
            task_id='distcp_command', 
            source_conn=source_conn,
            work=work,
            dag=dag)

        t4 = BashOperator(
            task_id='run_distcp',
            bash_command="{{ ti.xcom_pull(task_ids='distcp_command') }}",
            xcom_push=True,
            env=os.environ.copy(),
            dag=dag)

        t5 = BashOperator(
            task_id='get_application_id',
            bash_command="echo {{ ti.xcom_pull(task_ids='run_distcp') }} | awk '{print $NF}' | sed 's/job/application/g'",
            xcom_push=True,
            env=os.environ.copy(),
            dag=dag)

        t6 = DistCpMonitor(
            task_id='monitor',
            application_id="{{ ti.xcom_pull(task_ids='get_application_id') }}",
            resource_manager_conn=resource_manager_conn,
            dag=dag)

        t7 = DistCpSensorCompletionOperator(
            task_id='mysql_update',
            distcp_dao=distcp_dao,
            dag=dag)

        t3.set_upstream(t2)
        t4.set_upstream(t3)
        t5.set_upstream(t4)
        t6.set_upstream(t5)
        t7.set_upstream(t6)

t2 = PythonOperator(
    task_id='run_distcp_on_each',
    provide_context=True,
    python_callable=run_distcp_on_each,
    dag=dag)

t2.set_upstream(t1)
{code}

> How can I have an Operator B iterate over a list returned from upstream by Operator A?
> --------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-19
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-19
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Praveenkumar Venkatesan
>            Priority: Minor
>              Labels: support
>
> Here is what I am trying to do exactly: https://gist.github.com/praveev/7b93b50746f8e965f7139ecba028490a
> the python operator log just returns the following
> [2016-04-28 11:56:22,296] {models.py:1041} INFO - Executing <Task(PythonOperator): run_distcp_on_each> on 2016-04-28 11:56:12
> [2016-04-28 11:56:22,350] {python_operator.py:66} INFO - Done. Returned value was: None
> it didn't even print my kwargs and to_process data
> To simplify this. Lets say t1 returns 3 elements. I want to iterate over the list and run t2 -> t3 for each element.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)