You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Michal K <po...@yahoo.com.INVALID> on 2016/10/17 10:24:45 UTC

Suspend and resume Sensors

Hello Everyone,
our company is currently considering Airflow for the engine of a pretty big workflow system.We use Python 3 and have some HA requirements, so we plan to use Celery with multiple worker and scheduler nodes and RabbitMQ as message bus. We are also using Docker containers for deployment.
We have a specific case, which we need to handle in our workflows. Some tasks, which we trigger are very long lived (many hours long). These long tasks are carried out by external systems, independent of the Airflow nodes. In our workflow need to start, then monitor these external tasks and trigger downstream operators based on their results.
This seems like a perfect job for Airflow sensors. Here however, we run into a small problem. In our testing it seems not possible to stop an Airflow worker while a sensor is running. The sensor continues to run during a Celery warm shutdown and it's background process remains active even after a cold shutdown. This prevents us from being able to safely destroy the worker's Docker container.
Since at least one of the external tasks we need to monitor is practically always running, we would not be able to stop our Airflow workers in order to deploy code changes to our DAGs or the libraries they depend on.
It would be perfect for our case to have the ability to suspend an Airflow sensor while shutting down the worker and resume it when the worker restarts.
I was wondering how this could be implemented in Airflow and I came up with this initial idea:https://github.com/postrational/incubator-airflow/commit/74ac6f7290d6838362ff437e228465bb49fe198f
The code adds a signal handler to the BaseSensorOperator, which raises an exception if it detects that the worker is shutting down (SIGINT detected).
Later on it handles the exception in the 'run' method of the TaskInstance. The idea is to put the sensor in a state which would cause it to be cleanly resumed after Airflow comes back up after a restart.
So far, my draft code works some of the time, but not always. Sometimes the sensor resumes correctly, but sometimes it doesn't trigger its downstream operator and the whole DAG run is marked as "failed".
I noticed the following line in the sensor's logs:[2016-10-13 19:15:17,033] {jobs.py:1976} WARNING - State of this instance has been externally set to None. Taking the poison pill. So long.
I would like to ask you how to properly mark an operator to be restarted cleanly.
Before I proceed any further I would also like to ask for your opinion of this approach. How do you handle long running sensors? Do you use Docker with Airflow?
Any advice would be greatly appreciated. Thanks,
Michal

Re: Suspend and resume Sensors

Posted by Li Xuan Ji <xu...@gmail.com>.
Hi Michal,

I took a quick look at your patch, does your `airflow run` process
kill itself after it receives a SIGINT? I noticed it catches the
`AirflowRetryException` but doesn't re-raise it.

Otherwise, does the log line ("WARNING - State of this instance has
been externally set to None. Taking the poison pill. So long.") come
from a process from before you restart airflow or after?

> The sensor continues to run during a Celery warm shutdown and it's background process remains active even after a cold shutdown.

I looked into this before and we inherited this behavior from Celery
(http://docs.celeryproject.org/en/latest/userguide/workers.html#stopping-the-worker).
With a long-running sensor what will happen is that Celery receives a
SIGTERM and waits for the task (sensor) to finish. Celery by default
will wait forever. I have an open PR to change this
(https://github.com/apache/incubator-airflow/pull/1732) which might do
what you want, but it seems some people rely on the existing behavior.

Regards,
Xuanji



On 17 October 2016 at 11:54, Maxime Beauchemin
<ma...@gmail.com> wrote:
> I think you the behavior you want may be exactly what will happen provided
> that you setup your sensors to retry when they fail.
>
> Let me describe a play-by-play of how things should happen
> * sensor is running, doing its sensor thing (for this sake let's say it's
> waiting for a file in HDFS to show up), note that it has to be a
> "stateless" sensor to work well here, the filename it looks up is "static"
> in this example, if the task restarts, it will look at the exact same
> filename.
> * the worker on which this sensor runs goes dark, let's say it's rebooted
> * task instance is still marked as running as far as the state of airflow,
> but no hearbeats for that task instance are emmited / recorded anymore
> * the scheduler's process monitors for heartbeats, and sees that task
> instance X has not emitted a heartbeat in the required threshold amount of
> time, it decides to fail the task, which triggers the normal post-failure
> hooks and email alerting if the task is setup that way
> * assuming the task was setup with let's say 5 retries task becomes in an
> up_for_retry state, with a retry count of 0
> * scheduler re-fires the sensor, cranks up the count of retry to 1 and the
> task is running again
> * sensor wakes up every retry_interval, checks if the criteria is met
> * sensor finds the criteria is met, task instance status is set to success
> * downstream tasks fire
>
> If your sensor isn't stateless by nature (let's say it gets a jobid from a
> remote service when it starts, and hold it in memory to make periodic calls
> to track completion), somehow you have to make stateless. Maybe using XCom
> to put the persisted state in the Airflow db, or perhaps some sort of
> contract like a predictable, atomic signal for the task to check.
>
> Max
>
> On Mon, Oct 17, 2016 at 3:24 AM, Michal K <po...@yahoo.com.invalid>
> wrote:
>
>> Hello Everyone,
>> our company is currently considering Airflow for the engine of a pretty
>> big workflow system.We use Python 3 and have some HA requirements, so we
>> plan to use Celery with multiple worker and scheduler nodes and RabbitMQ as
>> message bus. We are also using Docker containers for deployment.
>> We have a specific case, which we need to handle in our workflows. Some
>> tasks, which we trigger are very long lived (many hours long). These long
>> tasks are carried out by external systems, independent of the Airflow
>> nodes. In our workflow need to start, then monitor these external tasks and
>> trigger downstream operators based on their results.
>> This seems like a perfect job for Airflow sensors. Here however, we run
>> into a small problem. In our testing it seems not possible to stop an
>> Airflow worker while a sensor is running. The sensor continues to run
>> during a Celery warm shutdown and it's background process remains active
>> even after a cold shutdown. This prevents us from being able to safely
>> destroy the worker's Docker container.
>> Since at least one of the external tasks we need to monitor is practically
>> always running, we would not be able to stop our Airflow workers in order
>> to deploy code changes to our DAGs or the libraries they depend on.
>> It would be perfect for our case to have the ability to suspend an Airflow
>> sensor while shutting down the worker and resume it when the worker
>> restarts.
>> I was wondering how this could be implemented in Airflow and I came up
>> with this initial idea:https://github.com/postrational/incubator-
>> airflow/commit/74ac6f7290d6838362ff437e228465bb49fe198f
>> The code adds a signal handler to the BaseSensorOperator, which raises an
>> exception if it detects that the worker is shutting down (SIGINT detected).
>> Later on it handles the exception in the 'run' method of the TaskInstance.
>> The idea is to put the sensor in a state which would cause it to be cleanly
>> resumed after Airflow comes back up after a restart.
>> So far, my draft code works some of the time, but not always. Sometimes
>> the sensor resumes correctly, but sometimes it doesn't trigger its
>> downstream operator and the whole DAG run is marked as "failed".
>> I noticed the following line in the sensor's logs:[2016-10-13
>> 19:15:17,033] {jobs.py:1976} WARNING - State of this instance has been
>> externally set to None. Taking the poison pill. So long.
>> I would like to ask you how to properly mark an operator to be restarted
>> cleanly.
>> Before I proceed any further I would also like to ask for your opinion of
>> this approach. How do you handle long running sensors? Do you use Docker
>> with Airflow?
>> Any advice would be greatly appreciated. Thanks,
>> Michal



-- 
Im Xuan Ji!

Re: Suspend and resume Sensors

Posted by Maxime Beauchemin <ma...@gmail.com>.
I think you the behavior you want may be exactly what will happen provided
that you setup your sensors to retry when they fail.

Let me describe a play-by-play of how things should happen
* sensor is running, doing its sensor thing (for this sake let's say it's
waiting for a file in HDFS to show up), note that it has to be a
"stateless" sensor to work well here, the filename it looks up is "static"
in this example, if the task restarts, it will look at the exact same
filename.
* the worker on which this sensor runs goes dark, let's say it's rebooted
* task instance is still marked as running as far as the state of airflow,
but no hearbeats for that task instance are emmited / recorded anymore
* the scheduler's process monitors for heartbeats, and sees that task
instance X has not emitted a heartbeat in the required threshold amount of
time, it decides to fail the task, which triggers the normal post-failure
hooks and email alerting if the task is setup that way
* assuming the task was setup with let's say 5 retries task becomes in an
up_for_retry state, with a retry count of 0
* scheduler re-fires the sensor, cranks up the count of retry to 1 and the
task is running again
* sensor wakes up every retry_interval, checks if the criteria is met
* sensor finds the criteria is met, task instance status is set to success
* downstream tasks fire

If your sensor isn't stateless by nature (let's say it gets a jobid from a
remote service when it starts, and hold it in memory to make periodic calls
to track completion), somehow you have to make stateless. Maybe using XCom
to put the persisted state in the Airflow db, or perhaps some sort of
contract like a predictable, atomic signal for the task to check.

Max

On Mon, Oct 17, 2016 at 3:24 AM, Michal K <po...@yahoo.com.invalid>
wrote:

> Hello Everyone,
> our company is currently considering Airflow for the engine of a pretty
> big workflow system.We use Python 3 and have some HA requirements, so we
> plan to use Celery with multiple worker and scheduler nodes and RabbitMQ as
> message bus. We are also using Docker containers for deployment.
> We have a specific case, which we need to handle in our workflows. Some
> tasks, which we trigger are very long lived (many hours long). These long
> tasks are carried out by external systems, independent of the Airflow
> nodes. In our workflow need to start, then monitor these external tasks and
> trigger downstream operators based on their results.
> This seems like a perfect job for Airflow sensors. Here however, we run
> into a small problem. In our testing it seems not possible to stop an
> Airflow worker while a sensor is running. The sensor continues to run
> during a Celery warm shutdown and it's background process remains active
> even after a cold shutdown. This prevents us from being able to safely
> destroy the worker's Docker container.
> Since at least one of the external tasks we need to monitor is practically
> always running, we would not be able to stop our Airflow workers in order
> to deploy code changes to our DAGs or the libraries they depend on.
> It would be perfect for our case to have the ability to suspend an Airflow
> sensor while shutting down the worker and resume it when the worker
> restarts.
> I was wondering how this could be implemented in Airflow and I came up
> with this initial idea:https://github.com/postrational/incubator-
> airflow/commit/74ac6f7290d6838362ff437e228465bb49fe198f
> The code adds a signal handler to the BaseSensorOperator, which raises an
> exception if it detects that the worker is shutting down (SIGINT detected).
> Later on it handles the exception in the 'run' method of the TaskInstance.
> The idea is to put the sensor in a state which would cause it to be cleanly
> resumed after Airflow comes back up after a restart.
> So far, my draft code works some of the time, but not always. Sometimes
> the sensor resumes correctly, but sometimes it doesn't trigger its
> downstream operator and the whole DAG run is marked as "failed".
> I noticed the following line in the sensor's logs:[2016-10-13
> 19:15:17,033] {jobs.py:1976} WARNING - State of this instance has been
> externally set to None. Taking the poison pill. So long.
> I would like to ask you how to properly mark an operator to be restarted
> cleanly.
> Before I proceed any further I would also like to ask for your opinion of
> this approach. How do you handle long running sensors? Do you use Docker
> with Airflow?
> Any advice would be greatly appreciated. Thanks,
> Michal