You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2016/04/28 23:14:12 UTC
[jira] [Commented] (AIRFLOW-19) How can I have an Operator B
iterate over a list returned from upstream by Operator A?
[ https://issues.apache.org/jira/browse/AIRFLOW-19?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263013#comment-15263013 ]
Chris Riccomini commented on AIRFLOW-19:
----------------------------------------
Code from gist is:
{code}
t1 = DistCpSensor(
task_id='sensor',
lookback=2,
distcp_dao=distcp_dao,
source_conn=source_conn,
parent_paths=['/tmp/testAirflowDistCp'],
timeout=12*60*60,
dag=dag)
def run_distcp_on_each(*args, **kwargs):
ti = kwargs['ti']
pprint(kwargs)
to_process = ti.xcom_pull(task_ids=None, key='to_process')
pprint(to_process)
for work in to_process:
t3 = DistCpOperator(
task_id='distcp_command',
source_conn=source_conn,
work=work,
dag=dag)
t4 = BashOperator(
task_id='run_distcp',
bash_command="{{ ti.xcom_pull(task_ids='distcp_command') }}",
xcom_push=True,
env=os.environ.copy(),
dag=dag)
t5 = BashOperator(
task_id='get_application_id',
bash_command="echo {{ ti.xcom_pull(task_ids='run_distcp') }} | awk '{print $NF}' | sed 's/job/application/g'",
xcom_push=True,
env=os.environ.copy(),
dag=dag)
t6 = DistCpMonitor(
task_id='monitor',
application_id="{{ ti.xcom_pull(task_ids='get_application_id') }}",
resource_manager_conn=resource_manager_conn,
dag=dag)
t7 = DistCpSensorCompletionOperator(
task_id='mysql_update',
distcp_dao=distcp_dao,
dag=dag)
t3.set_upstream(t2)
t4.set_upstream(t3)
t5.set_upstream(t4)
t6.set_upstream(t5)
t7.set_upstream(t6)
t2 = PythonOperator(
task_id='run_distcp_on_each',
provide_context=True,
python_callable=run_distcp_on_each,
dag=dag)
t2.set_upstream(t1)
{code}
> How can I have an Operator B iterate over a list returned from upstream by Operator A?
> --------------------------------------------------------------------------------------
>
> Key: AIRFLOW-19
> URL: https://issues.apache.org/jira/browse/AIRFLOW-19
> Project: Apache Airflow
> Issue Type: Bug
> Reporter: Praveenkumar Venkatesan
> Priority: Minor
> Labels: support
>
> Here is what I am trying to do exactly: https://gist.github.com/praveev/7b93b50746f8e965f7139ecba028490a
> the python operator log just returns the following
> [2016-04-28 11:56:22,296] {models.py:1041} INFO - Executing <Task(PythonOperator): run_distcp_on_each> on 2016-04-28 11:56:12
> [2016-04-28 11:56:22,350] {python_operator.py:66} INFO - Done. Returned value was: None
> it didn't even print my kwargs and to_process data
> To simplify this. Lets say t1 returns 3 elements. I want to iterate over the list and run t2 -> t3 for each element.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)