You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by zi...@vo.yoo.ro on 2016/07/08 17:58:39 UTC
Help reusing filepath from previous task
Hello.
I'd like to reuse some info given by a task to feed subsequent S3KeySensor
checks, but after trying several ideas I'm not able to make this work.
Basically my current DAG is as follow :
copy_data = BashOperator(
task_id='copy_data',
# this script copies data to S3
bash_command='do_stuff.sh',
dag=dag)
s3_path = xxxx
for data_type in ("fileA", "fileB", "fileC"):
S3Sensor_task = S3KeySensor(
task_id='check_' + data_type,
poke_interval=20,
timeout=60,
retry_delay=timedelta(seconds=30),
bucket_key=s3_path + data_type,
bucket_name='xxxx',
s3_conn_id='s3_conn_id',
dag=dag)
S3Sensor_task.set_upstream(copy_data)
This works great, but I'd rather avoid duplicating the S3 path in the DAG
and in the do_stuff.sh script.
So I tought pushing S3 path from the do_stuff.sh to xcom variables, but
then I need access to thoses items in my for loop.
I've tried SubDag but once again I did not manage to read xcom.
Could someone give me some hints ?
Regards
Re: Help reusing filepath from previous task
Posted by Maxime Beauchemin <ma...@gmail.com>.
You could:
copy_data = BashOperator(
task_id='copy_data',
bash_command='do_stuff.sh',
params={'s3_path': s3_path}
dag=dag)
then reference {{ params.s3_path }} as a templated element in do_stuff.sh
Max
On Fri, Jul 8, 2016 at 1:58 PM, <zi...@vo.yoo.ro> wrote:
> Hello.
>
> I'd like to reuse some info given by a task to feed subsequent S3KeySensor
> checks, but after trying several ideas I'm not able to make this work.
>
> Basically my current DAG is as follow :
>
> copy_data = BashOperator(
> task_id='copy_data',
> # this script copies data to S3
> bash_command='do_stuff.sh',
> dag=dag)
>
> s3_path = xxxx
> for data_type in ("fileA", "fileB", "fileC"):
> S3Sensor_task = S3KeySensor(
> task_id='check_' + data_type,
> poke_interval=20,
> timeout=60,
> retry_delay=timedelta(seconds=30),
> bucket_key=s3_path + data_type,
> bucket_name='xxxx',
> s3_conn_id='s3_conn_id',
> dag=dag)
> S3Sensor_task.set_upstream(copy_data)
>
> This works great, but I'd rather avoid duplicating the S3 path in the DAG
> and in the do_stuff.sh script.
> So I tought pushing S3 path from the do_stuff.sh to xcom variables, but
> then I need access to thoses items in my for loop.
> I've tried SubDag but once again I did not manage to read xcom.
>
> Could someone give me some hints ?
>
> Regards
>