You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by "Cieplucha, Michal" <mi...@intel.com> on 2017/10/18 12:58:30 UTC

too many long running tasks in sensors

Hello,

We are using sensors for lightweight but taking long time tasks. It's like monitoring results of some testing executed on remote machine. We would like to see such a task in a DAG but with so much tasks/dags in running state we will hit max number of processes/dag runs (defined in airflow.cfg) soon. Is it possible to have a sensor, which instead of sleep and keep the process would just exit and be rescheduled later? Are we using sensors in wrong manner?

Thanks
mC

---------------------------------------------------------------------------------------------------------------------------------
I am an Intel employee. All comments and opinions are my own and do not represent the views of Intel.
--------------------------------------------------------------------

Intel Technology Poland sp. z o.o.
ul. Slowackiego 173 | 80-298 Gdansk | Sad Rejonowy Gdansk Polnoc | VII Wydzial Gospodarczy Krajowego Rejestru Sadowego - KRS 101882 | NIP 957-07-52-316 | Kapital zakladowy 200.000 PLN.

Ta wiadomosc wraz z zalacznikami jest przeznaczona dla okreslonego adresata i moze zawierac informacje poufne. W razie przypadkowego otrzymania tej wiadomosci, prosimy o powiadomienie nadawcy oraz trwale jej usuniecie; jakiekolwiek
przegladanie lub rozpowszechnianie jest zabronione.
This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). If you are not the intended recipient, please contact the sender and delete all copies; any review or distribution by
others is strictly prohibited.


Re: too many long running tasks in sensors

Posted by Ash Berlin-Taylor <as...@firemirror.com>.
This isn't possible with Sensors as they exist right now. Our solution was to not make it a sensor, but just a normal operator, and set a number of retries and a large retry_delay:

checkTask = FindTriggerFileForExecutionPeriod(
    task_id="trigger_file_for_period",
    s3_conn_id="s3_default",
    prefix=source_s3_uri,
    dag=dag,

    # Try every 6 hours for 3 days, just in case the trigger is delayed.
    retries=12,
    retry_delay=timedelta(hours=6)
)

When it fails it shows up as "Up for retry" (Yellow) in the scheduler but doesn't take up a slot.

The operator is defined as:

class FindTriggerFileForExecutionPeriod(BaseOperator):
    """
    Find any trigger file for the execution period.

    In the unlikely event that there are multiple trigger files the most recent
    (by date in filename, not mtime) will be returned.
    """
    trigger_file_wildcard = 'xxx*_*.gz.trigger'

    @apply_defaults
    def __init__(self, s3_conn_id, prefix, **kwargs):
        self.s3_conn_id = s3_conn_id
        self.prefix = prefix
        super().__init__(**kwargs)

    def execute(self, context):
        hook = S3Hook(self.s3_conn_id)
        (bucket_name, prefix) = hook.parse_s3_url(self.prefix)
        min_date = context['execution_date']
        max_date = context['next_execution_date']

        for key in sorted(hook.list_keys(bucket_name, prefix), reverse=True):
            if not fnmatch.fnmatch(basename(key), self.trigger_file_wildcard):
                continue
            when = _tapad_key_content_date(key)

            # We have sorted the keys, so if we get past the range we are
            # interested in we're not going to find anything.
            if when < min_date:
                break

            if when < max_date:
                self.xcom_push(context, "content_date", _generate_iso_prefix(when))
                return "s3://{}/{}".format(bucket_name, key)

        raise RuntimeError("No trigger file found with filename between in interval ({start}, {end}]".format(
            start=min_date,
            end=max_date,
        ))






> On 18 Oct 2017, at 13:58, Cieplucha, Michal <mi...@intel.com> wrote:
> 
> Hello,
> 
> We are using sensors for lightweight but taking long time tasks. It's like monitoring results of some testing executed on remote machine. We would like to see such a task in a DAG but with so much tasks/dags in running state we will hit max number of processes/dag runs (defined in airflow.cfg) soon. Is it possible to have a sensor, which instead of sleep and keep the process would just exit and be rescheduled later? Are we using sensors in wrong manner?
> 
> Thanks
> mC
> 
> ---------------------------------------------------------------------------------------------------------------------------------
> I am an Intel employee. All comments and opinions are my own and do not represent the views of Intel.
> --------------------------------------------------------------------
> 
> Intel Technology Poland sp. z o.o.
> ul. Slowackiego 173 | 80-298 Gdansk | Sad Rejonowy Gdansk Polnoc | VII Wydzial Gospodarczy Krajowego Rejestru Sadowego - KRS 101882 | NIP 957-07-52-316 | Kapital zakladowy 200.000 PLN.
> 
> Ta wiadomosc wraz z zalacznikami jest przeznaczona dla okreslonego adresata i moze zawierac informacje poufne. W razie przypadkowego otrzymania tej wiadomosci, prosimy o powiadomienie nadawcy oraz trwale jej usuniecie; jakiekolwiek
> przegladanie lub rozpowszechnianie jest zabronione.
> This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). If you are not the intended recipient, please contact the sender and delete all copies; any review or distribution by
> others is strictly prohibited.
>