You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by purna pradeep <pu...@gmail.com> on 2018/05/21 17:16:08 UTC

S3keysonsor

Hi ,

I’m trying to evaluate airflow to see if it suits my needs.

Basically i can have below steps in a DAG



1)Look for a file arrival on given s3 location (this path is not completely
static) (i can use S3Keysensor in this step)

  i should be able to specify to look either for latest folder or 24hrs or
n number of days older folder which has _SUCCESS file as mentioned below

  sample file location(s):

  s3a://mybucket/20180425_111447_data1/_SUCCESS

  s3a://mybucket/20180424_111241_data1/_SUCCESS

  s3a://mybucket/20180424_111035_data1/_SUCCESS



2)invoke a simple restapi using HttpSimpleOperator once the above
dependency is met ,i can set upstream for step2 as step1



Does S3keysensor supports step1 out of the box?

Also in some cases i may to have a DAG without start date & end date it
just needs to be triggered once file is available in a given s3 location



 *Please suggest !*

Re: S3keysonsor

Posted by Rajesh C <ra...@gmail.com>.
The sensor allows wild card (*) and there is also an S3PrefixSensor which
might help in some cases. In one of my dags, I have a similar structure.

wait_on_s3_source_data = S3KeySensor(
    task_id='wait_on_s3_source_data',
    bucket_key="s3://mybucket_name/data_file_name_{{macros.ds_format(macros.ds_add(ds,1),'%Y-%m-%d','%Y%m%d')}}*/_SUCCESS",
    wildcard_match=True,
    timeout=60*15,
    retries=4,
    dag=dag)

I have files like the below in my bucket. The date part is fixed/expected
to be exactly a match based on ds, the time part (after 8 digits) is
different

data_file_name_20170912181034/done_marker.csv
data_file_name_20170913181035/done_marker.csv
data_file_name_20170914181027/done_marker.csv
data_file_name_20170915181033/done_marker.csv

If you can receieve a file with *any date* and you need to process it, you
might need to just use the wild card on the whole bucket and then process
the file based on the name or the data inside. May be a custom python
function to archive the file after you process it as well.

On Mon, May 21, 2018 at 2:59 PM purna pradeep <pu...@gmail.com>
wrote:

> + Joe
>
>
>
> On Mon, May 21, 2018 at 2:56 PM purna pradeep <pu...@gmail.com>
> wrote:
>
> > I do know only to some extent , I mean If you see my sample s3 locations
> >
> > s3a://mybucket/20180425_111447_data1/_SUCCESS
> >
> > s3a://mybucket/20180424_111241_data1/_SUCCESS
> >
> >
> >
> > The only values which are static in above location are
> >
> > s3a://mybucket/
> >
> > data1/_SUCCESS
> >
> > Now I want to configure tolerance for _SUCCESS file as latest or 1 day
> > older based on this configuration it should pick the right time stamp
> > folder which has _SUCCESS file
> >
> > On Mon, May 21, 2018 at 2:35 PM Joe Napolitano <
> joe.napolitano@wework.com>
> > wrote:
> >
> >> Purna, with regards to "this path is not completely static," can you
> >> clarify what you mean?
> >>
> >> Do you mean that you don't know the actual key name beforehand? E.g.
> >> pertaining to "111447", "111241", and "111035" in your example?
> >>
> >> On Mon, May 21, 2018 at 2:23 PM, Brian Greene <
> >> brian@heisenbergwoodworking.com> wrote:
> >>
> >> > I suggest it’ll work for your needs.
> >> >
> >> > Sent from a device with less than stellar autocorrect
> >> >
> >> > > On May 21, 2018, at 10:16 AM, purna pradeep <
> purna2pradeep@gmail.com>
> >> > wrote:
> >> > >
> >> > > Hi ,
> >> > >
> >> > > I’m trying to evaluate airflow to see if it suits my needs.
> >> > >
> >> > > Basically i can have below steps in a DAG
> >> > >
> >> > >
> >> > >
> >> > > 1)Look for a file arrival on given s3 location (this path is not
> >> > completely
> >> > > static) (i can use S3Keysensor in this step)
> >> > >
> >> > >  i should be able to specify to look either for latest folder or
> >> 24hrs or
> >> > > n number of days older folder which has _SUCCESS file as mentioned
> >> below
> >> > >
> >> > >  sample file location(s):
> >> > >
> >> > >  s3a://mybucket/20180425_111447_data1/_SUCCESS
> >> > >
> >> > >
> >
> >
> > s3a://mybucket/20180424_111241_data1/_SUCCESS
> >> > >
> >> > >  s3a://mybucket/20180424_111035_data1/_SUCCESS
> >> > >
> >> > >
> >> > >
> >> > > 2)invoke a simple restapi using HttpSimpleOperator once the above
> >> > > dependency is met ,i can set upstream for step2 as step1
> >> > >
> >> > >
> >> > >
> >> > > Does S3keysensor supports step1 out of the box?
> >> > >
> >> > > Also in some cases i may to have a DAG without start date & end date
> >> it
> >> > > just needs to be triggered once file is available in a given s3
> >> location
> >> > >
> >> > >
> >> > >
> >> > > *Please suggest !*
> >> >
> >>
> >
>

Re: S3keysonsor

Posted by Joe Napolitano <jo...@wework.com>.
Great, I think we're in agreement on your definition of static.

In my own experience, working with S3 keys can be painful if you can't
anticipate the key name. I don't think the S3KeySensor will work as it's
written.

There's another operator that's not in the docs, but can be seen below the
S3KeySensor called S3PrefixSensor here:
https://airflow.apache.org/_modules/sensors.html#S3KeySensor

That may work for you. Overall your question was whether or Airflow suits
your needs. I think the answer to that is YES, but in the worst case you'll
have to write a customer operator to handle your needs precisely, .e.g. by
processing all files that match a prefix "s3a://mybucket/{{date}}*".

On Mon, May 21, 2018 at 2:59 PM, purna pradeep <pu...@gmail.com>
wrote:

> + Joe
>
>
>
> On Mon, May 21, 2018 at 2:56 PM purna pradeep <pu...@gmail.com>
> wrote:
>
>> I do know only to some extent , I mean If you see my sample s3 locations
>>
>> s3a://mybucket/20180425_111447_data1/_SUCCESS
>>
>> s3a://mybucket/20180424_111241_data1/_SUCCESS
>>
>>
>>
>> The only values which are static in above location are
>>
>> s3a://mybucket/
>>
>> data1/_SUCCESS
>>
>> Now I want to configure tolerance for _SUCCESS file as latest or 1 day
>> older based on this configuration it should pick the right time stamp
>> folder which has _SUCCESS file
>>
>> On Mon, May 21, 2018 at 2:35 PM Joe Napolitano <jo...@wework.com>
>> wrote:
>>
>>> Purna, with regards to "this path is not completely static," can you
>>> clarify what you mean?
>>>
>>> Do you mean that you don't know the actual key name beforehand? E.g.
>>> pertaining to "111447", "111241", and "111035" in your example?
>>>
>>> On Mon, May 21, 2018 at 2:23 PM, Brian Greene <
>>> brian@heisenbergwoodworking.com> wrote:
>>>
>>> > I suggest it’ll work for your needs.
>>> >
>>> > Sent from a device with less than stellar autocorrect
>>> >
>>> > > On May 21, 2018, at 10:16 AM, purna pradeep <purna2pradeep@gmail.com
>>> >
>>> > wrote:
>>> > >
>>> > > Hi ,
>>> > >
>>> > > I’m trying to evaluate airflow to see if it suits my needs.
>>> > >
>>> > > Basically i can have below steps in a DAG
>>> > >
>>> > >
>>> > >
>>> > > 1)Look for a file arrival on given s3 location (this path is not
>>> > completely
>>> > > static) (i can use S3Keysensor in this step)
>>> > >
>>> > >  i should be able to specify to look either for latest folder or
>>> 24hrs or
>>> > > n number of days older folder which has _SUCCESS file as mentioned
>>> below
>>> > >
>>> > >  sample file location(s):
>>> > >
>>> > >  s3a://mybucket/20180425_111447_data1/_SUCCESS
>>> > >
>>> > >
>>
>>
>> s3a://mybucket/20180424_111241_data1/_SUCCESS
>>> > >
>>> > >  s3a://mybucket/20180424_111035_data1/_SUCCESS
>>> > >
>>> > >
>>> > >
>>> > > 2)invoke a simple restapi using HttpSimpleOperator once the above
>>> > > dependency is met ,i can set upstream for step2 as step1
>>> > >
>>> > >
>>> > >
>>> > > Does S3keysensor supports step1 out of the box?
>>> > >
>>> > > Also in some cases i may to have a DAG without start date & end date
>>> it
>>> > > just needs to be triggered once file is available in a given s3
>>> location
>>> > >
>>> > >
>>> > >
>>> > > *Please suggest !*
>>> >
>>>
>>

Re: S3keysonsor

Posted by purna pradeep <pu...@gmail.com>.
+ Joe



On Mon, May 21, 2018 at 2:56 PM purna pradeep <pu...@gmail.com>
wrote:

> I do know only to some extent , I mean If you see my sample s3 locations
>
> s3a://mybucket/20180425_111447_data1/_SUCCESS
>
> s3a://mybucket/20180424_111241_data1/_SUCCESS
>
>
>
> The only values which are static in above location are
>
> s3a://mybucket/
>
> data1/_SUCCESS
>
> Now I want to configure tolerance for _SUCCESS file as latest or 1 day
> older based on this configuration it should pick the right time stamp
> folder which has _SUCCESS file
>
> On Mon, May 21, 2018 at 2:35 PM Joe Napolitano <jo...@wework.com>
> wrote:
>
>> Purna, with regards to "this path is not completely static," can you
>> clarify what you mean?
>>
>> Do you mean that you don't know the actual key name beforehand? E.g.
>> pertaining to "111447", "111241", and "111035" in your example?
>>
>> On Mon, May 21, 2018 at 2:23 PM, Brian Greene <
>> brian@heisenbergwoodworking.com> wrote:
>>
>> > I suggest it’ll work for your needs.
>> >
>> > Sent from a device with less than stellar autocorrect
>> >
>> > > On May 21, 2018, at 10:16 AM, purna pradeep <pu...@gmail.com>
>> > wrote:
>> > >
>> > > Hi ,
>> > >
>> > > I’m trying to evaluate airflow to see if it suits my needs.
>> > >
>> > > Basically i can have below steps in a DAG
>> > >
>> > >
>> > >
>> > > 1)Look for a file arrival on given s3 location (this path is not
>> > completely
>> > > static) (i can use S3Keysensor in this step)
>> > >
>> > >  i should be able to specify to look either for latest folder or
>> 24hrs or
>> > > n number of days older folder which has _SUCCESS file as mentioned
>> below
>> > >
>> > >  sample file location(s):
>> > >
>> > >  s3a://mybucket/20180425_111447_data1/_SUCCESS
>> > >
>> > >
>
>
> s3a://mybucket/20180424_111241_data1/_SUCCESS
>> > >
>> > >  s3a://mybucket/20180424_111035_data1/_SUCCESS
>> > >
>> > >
>> > >
>> > > 2)invoke a simple restapi using HttpSimpleOperator once the above
>> > > dependency is met ,i can set upstream for step2 as step1
>> > >
>> > >
>> > >
>> > > Does S3keysensor supports step1 out of the box?
>> > >
>> > > Also in some cases i may to have a DAG without start date & end date
>> it
>> > > just needs to be triggered once file is available in a given s3
>> location
>> > >
>> > >
>> > >
>> > > *Please suggest !*
>> >
>>
>

Re: S3keysonsor

Posted by purna pradeep <pu...@gmail.com>.
I do know only to some extent , I mean If you see my sample s3 locations

s3a://mybucket/20180425_111447_data1/_SUCCESS

s3a://mybucket/20180424_111241_data1/_SUCCESS



The only values which are static in above location are

s3a://mybucket/

data1/_SUCCESS

Now I want to configure tolerance for _SUCCESS file as latest or 1 day
older based on this configuration it should pick the right time stamp
folder which has _SUCCESS file

On Mon, May 21, 2018 at 2:35 PM Joe Napolitano <jo...@wework.com>
wrote:

> Purna, with regards to "this path is not completely static," can you
> clarify what you mean?
>
> Do you mean that you don't know the actual key name beforehand? E.g.
> pertaining to "111447", "111241", and "111035" in your example?
>
> On Mon, May 21, 2018 at 2:23 PM, Brian Greene <
> brian@heisenbergwoodworking.com> wrote:
>
> > I suggest it’ll work for your needs.
> >
> > Sent from a device with less than stellar autocorrect
> >
> > > On May 21, 2018, at 10:16 AM, purna pradeep <pu...@gmail.com>
> > wrote:
> > >
> > > Hi ,
> > >
> > > I’m trying to evaluate airflow to see if it suits my needs.
> > >
> > > Basically i can have below steps in a DAG
> > >
> > >
> > >
> > > 1)Look for a file arrival on given s3 location (this path is not
> > completely
> > > static) (i can use S3Keysensor in this step)
> > >
> > >  i should be able to specify to look either for latest folder or 24hrs
> or
> > > n number of days older folder which has _SUCCESS file as mentioned
> below
> > >
> > >  sample file location(s):
> > >
> > >  s3a://mybucket/20180425_111447_data1/_SUCCESS
> > >
> > >


s3a://mybucket/20180424_111241_data1/_SUCCESS
> > >
> > >  s3a://mybucket/20180424_111035_data1/_SUCCESS
> > >
> > >
> > >
> > > 2)invoke a simple restapi using HttpSimpleOperator once the above
> > > dependency is met ,i can set upstream for step2 as step1
> > >
> > >
> > >
> > > Does S3keysensor supports step1 out of the box?
> > >
> > > Also in some cases i may to have a DAG without start date & end date it
> > > just needs to be triggered once file is available in a given s3
> location
> > >
> > >
> > >
> > > *Please suggest !*
> >
>

Re: S3keysonsor

Posted by Joe Napolitano <jo...@wework.com>.
Purna, with regards to "this path is not completely static," can you
clarify what you mean?

Do you mean that you don't know the actual key name beforehand? E.g.
pertaining to "111447", "111241", and "111035" in your example?

On Mon, May 21, 2018 at 2:23 PM, Brian Greene <
brian@heisenbergwoodworking.com> wrote:

> I suggest it’ll work for your needs.
>
> Sent from a device with less than stellar autocorrect
>
> > On May 21, 2018, at 10:16 AM, purna pradeep <pu...@gmail.com>
> wrote:
> >
> > Hi ,
> >
> > I’m trying to evaluate airflow to see if it suits my needs.
> >
> > Basically i can have below steps in a DAG
> >
> >
> >
> > 1)Look for a file arrival on given s3 location (this path is not
> completely
> > static) (i can use S3Keysensor in this step)
> >
> >  i should be able to specify to look either for latest folder or 24hrs or
> > n number of days older folder which has _SUCCESS file as mentioned below
> >
> >  sample file location(s):
> >
> >  s3a://mybucket/20180425_111447_data1/_SUCCESS
> >
> >  s3a://mybucket/20180424_111241_data1/_SUCCESS
> >
> >  s3a://mybucket/20180424_111035_data1/_SUCCESS
> >
> >
> >
> > 2)invoke a simple restapi using HttpSimpleOperator once the above
> > dependency is met ,i can set upstream for step2 as step1
> >
> >
> >
> > Does S3keysensor supports step1 out of the box?
> >
> > Also in some cases i may to have a DAG without start date & end date it
> > just needs to be triggered once file is available in a given s3 location
> >
> >
> >
> > *Please suggest !*
>

Re: S3keysonsor

Posted by Brian Greene <br...@heisenbergwoodworking.com>.
I suggest it’ll work for your needs.

Sent from a device with less than stellar autocorrect

> On May 21, 2018, at 10:16 AM, purna pradeep <pu...@gmail.com> wrote:
> 
> Hi ,
> 
> I’m trying to evaluate airflow to see if it suits my needs.
> 
> Basically i can have below steps in a DAG
> 
> 
> 
> 1)Look for a file arrival on given s3 location (this path is not completely
> static) (i can use S3Keysensor in this step)
> 
>  i should be able to specify to look either for latest folder or 24hrs or
> n number of days older folder which has _SUCCESS file as mentioned below
> 
>  sample file location(s):
> 
>  s3a://mybucket/20180425_111447_data1/_SUCCESS
> 
>  s3a://mybucket/20180424_111241_data1/_SUCCESS
> 
>  s3a://mybucket/20180424_111035_data1/_SUCCESS
> 
> 
> 
> 2)invoke a simple restapi using HttpSimpleOperator once the above
> dependency is met ,i can set upstream for step2 as step1
> 
> 
> 
> Does S3keysensor supports step1 out of the box?
> 
> Also in some cases i may to have a DAG without start date & end date it
> just needs to be triggered once file is available in a given s3 location
> 
> 
> 
> *Please suggest !*