You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by ro...@goshift.com, ro...@goshift.com on 2018/08/09 08:25:17 UTC

Modeling rate limited api calls in airflow

Hello,

I am in the process of migrating a bespoke data pipe line built around celery into airflow.

We have a number of different tasks which interact with the Adwords API which has a rate limiting policy. The policy isn't a fixed number of requests its variable.

In our celery code we have handled this by capturing a rate limit error response and setting a key in redis to make sure that no tasks execute against the API until it's expired. Any task that does get executed checks for the presence of the key and if the key exists issues a retry for when the rate limit is due to expire.

Moving over to Airflow I can't find a way to go about scheduling a task to retry in a specific amount of time. Doing some reading it seems a Sensor could work to prevent other dags from executing whilst the rate limit is present.

I also can't seem to find an example of handling different exceptions from a python task and adapting the retry logic accordingly.

Any pointers would be much appreciated,

Rob

Re: Modeling rate limited api calls in airflow

Posted by Robin Edwards <ro...@goshift.com>.
Thanks Gerard, that's really helpful it would have taken me some time to
pinpoint that race condition.

I will go with your suggestion and implement a hook and manage the logic
within the operator its self,

Rob

On Sun, Aug 12, 2018 at 9:28 AM, Gerard Toonstra <gt...@gmail.com>
wrote:

> This is worth a design discussion in its own right, but here's my input.
>
> You're using a DAG with sensor operators to determine if something needs to
> be triggered.
> There is a time between the sensor "ok-ing" the progression and the dag
> being triggered and the
> first task being spun up. This interval can easily lead to a race
> conditions where another sensor elsewhere still
> sees a non-rate limited condition and may also initiate the dag. It's
> likely a rate limit will result from that.
>
> Second, should there still be a rate limit in effect, then the operators in
> the DAG won't respect the back-off
> period from there, because you passed that check already.
>
>
> For that reason I'd do this slightly differently in a more managed way. I
> don't have sufficient background with
> the business requirements and how many adwords related work there is in
> total, but here are three options to look into:
>
> - Make the adwords hook, which raises RateLimitException for example, then
> let the operator respond to that and
>   manage redis. FIrst check with redis when it starts, then call adwords
> and in case of failure, update redis and probably
>   go into the retry loop. You can set a low interval here, because it will
> check with redis anyway, that way you can support
>   back off periods of any resolution.
>
> - Just use a pool with the number of required simultaneous processes and
> play with the variables and rates to avoid the
>   rate limit in the first place. That way, you can maximize the API usage
> without creating a stampeding herd that will probably
>   lead to failure anyway.
>
> - There's another approach thinkable where a dag "requests" the use of the
> API by inserting a record in a queue in redis,
>   where the main dag does the actual triggering (so that all scheduling is
> centralized), but that's like building a scheduler in a
>   scheduler and in the end, a pool would give you the same functionality
> without all the hassle.
>
> Rgds,
>
> Gerard
>
> On Fri, Aug 10, 2018 at 12:41 PM Robin Edwards <ro...@goshift.com> wrote:
>
> > Thanks Gerard,
> >
> > Yea pools look really useful for limiting concurrent requests.
> >
> > Where you mention the use of a hook would you simply raise an exception
> > from get_conn() should the adwords account be rate limited then just
> > configure a number of retries and appropriate delay / back off on the
> > operator doing work with the api?
> >
> > I have come up with part of a solution using a key sensor and trigger dag
> > run. The idea would be that when my 'adwords_func' encounters a rate
> limit
> > error it sets a key in redis with an expiry matching the period in the
> rate
> > limit response then re-triggers the dag which will block on my sensor
> until
> > the key has expired.
> >
> > The hard part is now getting this mechanism to work within a sub dag as I
> > have multiple api operations that need limiting.
> >
> > def _adwords_rate_limited_dag(dag_id, adwords_func, max_dag_retries,
> > **kwargs):
> >     dag = DAG(dag_id, **kwargs)
> >
> >     def count_retries(context, obj):
> >         retries = context['dag_run'].conf.get('dag_retries', 1)
> >
> >         if retries > max_dag_retries:
> >             raise SystemError("Max retries reached for dag")
> >
> >         obj.payload = {'dag_retries': retries + 1}
> >
> >         return obj
> >
> >     with dag:
> >         RedisNoKeySensor(
> >             task_id='check_for_rate_limit',
> >             key='rate_limited',
> >             redis_conn_id='redis_master',
> >             poke_interval=10
> >         ) >> PythonOperator(
> >             task_id=shift_callable.__name__,
> >             python_callable=adwords_callable,
> >         ) >> TriggerDagRunOperator(
> >             task_id='retry_dag_on_failure',
> >             trigger_dag_id=dag_id,
> >             trigger_rule=TriggerRule.ONE_FAILED,
> >             python_callable=count_retries
> >         )
> >
> >     return dag
> >
> > Thanks for your help,
> >
> > Rob
> >
> >
> > On Thu, Aug 9, 2018 at 7:07 PM, Gerard Toonstra <gt...@gmail.com>
> > wrote:
> >
> > > Have you looked into pools?  Pools allow you to specify how many tasks
> at
> > > any given time should use a common resource.
> > > That way you could limit this to 1, 2, or 3 for example. Pools are not
> > > dynamic however, so it only allows you to upper limit how many
> > > number of clients are going to hit the API at any moment, not determine
> > how
> > > many when the rate limit is in effect
> > > (unless.... you use code to reconfigure the pool on demand, but I'm not
> > > sure if I should recommend that, i.e. reconfigure the # of clients
> > > on the basis of hitting the rate limit.)  It sounds as if this logic is
> > > best introduced at the hook level, where it determines that it passes
> > > out an API interface only when the rate limit is not in place, where
> > > operators specify how many retries should occur.
> > >
> > > The Adwords API does allow increasing the rate limit threshold though
> and
> > > you're probably better off negotiating
> > > with Google to up that threshold, explaining your business case etc.?
> > >
> > > Gerard
> > >
> > >
> > >
> > > On Thu, Aug 9, 2018 at 10:43 AM rob@goshift.com <ro...@goshift.com>
> wrote:
> > >
> > > > Hello,
> > > >
> > > > I am in the process of migrating a bespoke data pipe line built
> around
> > > > celery into airflow.
> > > >
> > > > We have a number of different tasks which interact with the Adwords
> API
> > > > which has a rate limiting policy. The policy isn't a fixed number of
> > > > requests its variable.
> > > >
> > > > In our celery code we have handled this by capturing a rate limit
> error
> > > > response and setting a key in redis to make sure that no tasks
> execute
> > > > against the API until it's expired. Any task that does get executed
> > > checks
> > > > for the presence of the key and if the key exists issues a retry for
> > when
> > > > the rate limit is due to expire.
> > > >
> > > > Moving over to Airflow I can't find a way to go about scheduling a
> task
> > > to
> > > > retry in a specific amount of time. Doing some reading it seems a
> > Sensor
> > > > could work to prevent other dags from executing whilst the rate limit
> > is
> > > > present.
> > > >
> > > > I also can't seem to find an example of handling different exceptions
> > > from
> > > > a python task and adapting the retry logic accordingly.
> > > >
> > > > Any pointers would be much appreciated,
> > > >
> > > > Rob
> > > >
> > >
> >
>

Re: Modeling rate limited api calls in airflow

Posted by Gerard Toonstra <gt...@gmail.com>.
This is worth a design discussion in its own right, but here's my input.

You're using a DAG with sensor operators to determine if something needs to
be triggered.
There is a time between the sensor "ok-ing" the progression and the dag
being triggered and the
first task being spun up. This interval can easily lead to a race
conditions where another sensor elsewhere still
sees a non-rate limited condition and may also initiate the dag. It's
likely a rate limit will result from that.

Second, should there still be a rate limit in effect, then the operators in
the DAG won't respect the back-off
period from there, because you passed that check already.


For that reason I'd do this slightly differently in a more managed way. I
don't have sufficient background with
the business requirements and how many adwords related work there is in
total, but here are three options to look into:

- Make the adwords hook, which raises RateLimitException for example, then
let the operator respond to that and
  manage redis. FIrst check with redis when it starts, then call adwords
and in case of failure, update redis and probably
  go into the retry loop. You can set a low interval here, because it will
check with redis anyway, that way you can support
  back off periods of any resolution.

- Just use a pool with the number of required simultaneous processes and
play with the variables and rates to avoid the
  rate limit in the first place. That way, you can maximize the API usage
without creating a stampeding herd that will probably
  lead to failure anyway.

- There's another approach thinkable where a dag "requests" the use of the
API by inserting a record in a queue in redis,
  where the main dag does the actual triggering (so that all scheduling is
centralized), but that's like building a scheduler in a
  scheduler and in the end, a pool would give you the same functionality
without all the hassle.

Rgds,

Gerard

On Fri, Aug 10, 2018 at 12:41 PM Robin Edwards <ro...@goshift.com> wrote:

> Thanks Gerard,
>
> Yea pools look really useful for limiting concurrent requests.
>
> Where you mention the use of a hook would you simply raise an exception
> from get_conn() should the adwords account be rate limited then just
> configure a number of retries and appropriate delay / back off on the
> operator doing work with the api?
>
> I have come up with part of a solution using a key sensor and trigger dag
> run. The idea would be that when my 'adwords_func' encounters a rate limit
> error it sets a key in redis with an expiry matching the period in the rate
> limit response then re-triggers the dag which will block on my sensor until
> the key has expired.
>
> The hard part is now getting this mechanism to work within a sub dag as I
> have multiple api operations that need limiting.
>
> def _adwords_rate_limited_dag(dag_id, adwords_func, max_dag_retries,
> **kwargs):
>     dag = DAG(dag_id, **kwargs)
>
>     def count_retries(context, obj):
>         retries = context['dag_run'].conf.get('dag_retries', 1)
>
>         if retries > max_dag_retries:
>             raise SystemError("Max retries reached for dag")
>
>         obj.payload = {'dag_retries': retries + 1}
>
>         return obj
>
>     with dag:
>         RedisNoKeySensor(
>             task_id='check_for_rate_limit',
>             key='rate_limited',
>             redis_conn_id='redis_master',
>             poke_interval=10
>         ) >> PythonOperator(
>             task_id=shift_callable.__name__,
>             python_callable=adwords_callable,
>         ) >> TriggerDagRunOperator(
>             task_id='retry_dag_on_failure',
>             trigger_dag_id=dag_id,
>             trigger_rule=TriggerRule.ONE_FAILED,
>             python_callable=count_retries
>         )
>
>     return dag
>
> Thanks for your help,
>
> Rob
>
>
> On Thu, Aug 9, 2018 at 7:07 PM, Gerard Toonstra <gt...@gmail.com>
> wrote:
>
> > Have you looked into pools?  Pools allow you to specify how many tasks at
> > any given time should use a common resource.
> > That way you could limit this to 1, 2, or 3 for example. Pools are not
> > dynamic however, so it only allows you to upper limit how many
> > number of clients are going to hit the API at any moment, not determine
> how
> > many when the rate limit is in effect
> > (unless.... you use code to reconfigure the pool on demand, but I'm not
> > sure if I should recommend that, i.e. reconfigure the # of clients
> > on the basis of hitting the rate limit.)  It sounds as if this logic is
> > best introduced at the hook level, where it determines that it passes
> > out an API interface only when the rate limit is not in place, where
> > operators specify how many retries should occur.
> >
> > The Adwords API does allow increasing the rate limit threshold though and
> > you're probably better off negotiating
> > with Google to up that threshold, explaining your business case etc.?
> >
> > Gerard
> >
> >
> >
> > On Thu, Aug 9, 2018 at 10:43 AM rob@goshift.com <ro...@goshift.com> wrote:
> >
> > > Hello,
> > >
> > > I am in the process of migrating a bespoke data pipe line built around
> > > celery into airflow.
> > >
> > > We have a number of different tasks which interact with the Adwords API
> > > which has a rate limiting policy. The policy isn't a fixed number of
> > > requests its variable.
> > >
> > > In our celery code we have handled this by capturing a rate limit error
> > > response and setting a key in redis to make sure that no tasks execute
> > > against the API until it's expired. Any task that does get executed
> > checks
> > > for the presence of the key and if the key exists issues a retry for
> when
> > > the rate limit is due to expire.
> > >
> > > Moving over to Airflow I can't find a way to go about scheduling a task
> > to
> > > retry in a specific amount of time. Doing some reading it seems a
> Sensor
> > > could work to prevent other dags from executing whilst the rate limit
> is
> > > present.
> > >
> > > I also can't seem to find an example of handling different exceptions
> > from
> > > a python task and adapting the retry logic accordingly.
> > >
> > > Any pointers would be much appreciated,
> > >
> > > Rob
> > >
> >
>

Re: Modeling rate limited api calls in airflow

Posted by Robin Edwards <ro...@goshift.com>.
Thanks Gerard,

Yea pools look really useful for limiting concurrent requests.

Where you mention the use of a hook would you simply raise an exception
from get_conn() should the adwords account be rate limited then just
configure a number of retries and appropriate delay / back off on the
operator doing work with the api?

I have come up with part of a solution using a key sensor and trigger dag
run. The idea would be that when my 'adwords_func' encounters a rate limit
error it sets a key in redis with an expiry matching the period in the rate
limit response then re-triggers the dag which will block on my sensor until
the key has expired.

The hard part is now getting this mechanism to work within a sub dag as I
have multiple api operations that need limiting.

def _adwords_rate_limited_dag(dag_id, adwords_func, max_dag_retries,
**kwargs):
    dag = DAG(dag_id, **kwargs)

    def count_retries(context, obj):
        retries = context['dag_run'].conf.get('dag_retries', 1)

        if retries > max_dag_retries:
            raise SystemError("Max retries reached for dag")

        obj.payload = {'dag_retries': retries + 1}

        return obj

    with dag:
        RedisNoKeySensor(
            task_id='check_for_rate_limit',
            key='rate_limited',
            redis_conn_id='redis_master',
            poke_interval=10
        ) >> PythonOperator(
            task_id=shift_callable.__name__,
            python_callable=adwords_callable,
        ) >> TriggerDagRunOperator(
            task_id='retry_dag_on_failure',
            trigger_dag_id=dag_id,
            trigger_rule=TriggerRule.ONE_FAILED,
            python_callable=count_retries
        )

    return dag

Thanks for your help,

Rob


On Thu, Aug 9, 2018 at 7:07 PM, Gerard Toonstra <gt...@gmail.com> wrote:

> Have you looked into pools?  Pools allow you to specify how many tasks at
> any given time should use a common resource.
> That way you could limit this to 1, 2, or 3 for example. Pools are not
> dynamic however, so it only allows you to upper limit how many
> number of clients are going to hit the API at any moment, not determine how
> many when the rate limit is in effect
> (unless.... you use code to reconfigure the pool on demand, but I'm not
> sure if I should recommend that, i.e. reconfigure the # of clients
> on the basis of hitting the rate limit.)  It sounds as if this logic is
> best introduced at the hook level, where it determines that it passes
> out an API interface only when the rate limit is not in place, where
> operators specify how many retries should occur.
>
> The Adwords API does allow increasing the rate limit threshold though and
> you're probably better off negotiating
> with Google to up that threshold, explaining your business case etc.?
>
> Gerard
>
>
>
> On Thu, Aug 9, 2018 at 10:43 AM rob@goshift.com <ro...@goshift.com> wrote:
>
> > Hello,
> >
> > I am in the process of migrating a bespoke data pipe line built around
> > celery into airflow.
> >
> > We have a number of different tasks which interact with the Adwords API
> > which has a rate limiting policy. The policy isn't a fixed number of
> > requests its variable.
> >
> > In our celery code we have handled this by capturing a rate limit error
> > response and setting a key in redis to make sure that no tasks execute
> > against the API until it's expired. Any task that does get executed
> checks
> > for the presence of the key and if the key exists issues a retry for when
> > the rate limit is due to expire.
> >
> > Moving over to Airflow I can't find a way to go about scheduling a task
> to
> > retry in a specific amount of time. Doing some reading it seems a Sensor
> > could work to prevent other dags from executing whilst the rate limit is
> > present.
> >
> > I also can't seem to find an example of handling different exceptions
> from
> > a python task and adapting the retry logic accordingly.
> >
> > Any pointers would be much appreciated,
> >
> > Rob
> >
>

Re: Modeling rate limited api calls in airflow

Posted by Gerard Toonstra <gt...@gmail.com>.
Have you looked into pools?  Pools allow you to specify how many tasks at
any given time should use a common resource.
That way you could limit this to 1, 2, or 3 for example. Pools are not
dynamic however, so it only allows you to upper limit how many
number of clients are going to hit the API at any moment, not determine how
many when the rate limit is in effect
(unless.... you use code to reconfigure the pool on demand, but I'm not
sure if I should recommend that, i.e. reconfigure the # of clients
on the basis of hitting the rate limit.)  It sounds as if this logic is
best introduced at the hook level, where it determines that it passes
out an API interface only when the rate limit is not in place, where
operators specify how many retries should occur.

The Adwords API does allow increasing the rate limit threshold though and
you're probably better off negotiating
with Google to up that threshold, explaining your business case etc.?

Gerard



On Thu, Aug 9, 2018 at 10:43 AM rob@goshift.com <ro...@goshift.com> wrote:

> Hello,
>
> I am in the process of migrating a bespoke data pipe line built around
> celery into airflow.
>
> We have a number of different tasks which interact with the Adwords API
> which has a rate limiting policy. The policy isn't a fixed number of
> requests its variable.
>
> In our celery code we have handled this by capturing a rate limit error
> response and setting a key in redis to make sure that no tasks execute
> against the API until it's expired. Any task that does get executed checks
> for the presence of the key and if the key exists issues a retry for when
> the rate limit is due to expire.
>
> Moving over to Airflow I can't find a way to go about scheduling a task to
> retry in a specific amount of time. Doing some reading it seems a Sensor
> could work to prevent other dags from executing whilst the rate limit is
> present.
>
> I also can't seem to find an example of handling different exceptions from
> a python task and adapting the retry logic accordingly.
>
> Any pointers would be much appreciated,
>
> Rob
>