You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/25 17:56:37 UTC

[GitHub] [airflow] jeffvswanson opened a new issue #17834: Python Sensor cannot be turned into a Smart Sensor

jeffvswanson opened a new issue #17834:
URL: https://github.com/apache/airflow/issues/17834


   <!--
   Welcome to Apache Airflow!
   
   Please complete the next sections or the issue will be closed.
   -->
   
   **Apache Airflow version**: 2.02
   
   <!-- AIRFLOW VERSION IS MANDATORY -->
   
   **OS**: Amazon Linux AMI
   
   <!-- MANDATORY! You can get it via `cat /etc/oss-release` for example -->
   
   **Apache Airflow Provider versions**: apache-airflow-providers-amazon, v2.1.0
   
   <!-- You can use `pip freeze | grep apache-airflow-providers` (you can leave only relevant ones)-->
   
   **Deployment**: AWS MWAA
   
   <!-- e.g. Virtualenv / VM / Docker-compose / K8S / Helm Chart / Managed Airflow Service -->
   
   <!-- Please include your deployment tools and versions: docker-compose, k8s, helm, etc -->
   
   **What happened**: Setting up a Smart Python Sensor in AWS MWAA always reverts to a normal Python Sensor task. This is because the Sensor Instance cannot JSON serialize the python callable function.
   
   Error message
   ```
   [2021-08-25 16:41:57,686] {{taskinstance.py:1301}} WARNING - Failed to register in sensor service.Continue to run task in non smart sensor mode.
   [2021-08-25 16:41:57,705] {{taskinstance.py:1303}} ERROR - Object of type function is not JSON serializable
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1298, in _prepare_and_execute_task_with_callbacks
       registered = task_copy.register_in_sensor_service(self, context)
     File "/usr/local/lib/python3.7/site-packages/airflow/sensors/base.py", line 163, in register_in_sensor_service
       return SensorInstance.register(ti, poke_context, execution_context)
     File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/airflow/models/sensorinstance.py", line 114, in register
       encoded_poke = json.dumps(poke_context)
     File "/usr/lib64/python3.7/json/__init__.py", line 231, in dumps
       return _default_encoder.encode(obj)
     File "/usr/lib64/python3.7/json/encoder.py", line 199, in encode
       chunks = self.iterencode(o, _one_shot=True)
     File "/usr/lib64/python3.7/json/encoder.py", line 257, in iterencode
       return _iterencode(o, 0)
     File "/usr/lib64/python3.7/json/encoder.py", line 179, in default
       raise TypeError(f'Object of type {o.__class__.__name__} '
   TypeError: Object of type function is not JSON serializable
   [2021-08-25 16:41:57,726] {{python.py:72}} INFO - Poking callable: <function _sla_check at 0x7f57ee835f80>
   ```
   <!-- Please include exact error messages if you can -->
   
   **What you expected to happen**: I expected a Python Sensor to be capable of being cast as a Smart Sensor with the appropriate legwork, similar to working with the Amazon Airflow Sensors being capable of being able to be classed as smart sensors.
   
   <!-- What do you think went wrong? -->
   
   **How to reproduce it**:
   1. Set up Apache Airflow environment configuration to support a `SmartPythonSensor` with `poke_context_fields` of `("python_callable", "op_kwargs")`.
   2. Create a child class, `SmartPythonSensor` that inherits from `PythonSensor` registered to be smart sensor compatible.
   3. Write a test DAG with a `SmartPythonSensor` task with a simple python callable.
   4. Watch the logs as the test DAG complains the `SmartPythonSensor` cannot be registered with the sensor service because the python callable the `SmartPythonSensor` relies upon cannot be JSON serialized.
   
   <!--
   As minimally and precisely as possible. Keep in mind we do not have access to your cluster or dags.
   If this is a UI bug, please provide a screenshot of the bug or a link to a youtube video of the bug in action
   You can include images/screen-casts etc. by drag-dropping the image here.
   -->
   
   **Anything else we need to know**:
   Downgrading a registered smart sensor to a regular sensor task will occur every time a DAG with a `SmartPythonSensor` runs due to the `SensorInstance` not being able to serialize a python callable.
   
   My `SmartPythonSensor` class:
   ```
   from airflow.sensors.python import PythonSensor
   from airflow.utils.decorators import apply_defaults
   
   
   class SmartPythonSensor(PythonSensor):
       poke_context_fields = ("python_callable", "op_kwargs")
   
       @apply_defaults
       def __init__(self, **kwargs):
           super().__init__(**kwargs)
   
       def is_smart_sensor_compatible(self):
           # Smart sensor cannot have on success callback
           self.on_success_callback = None
   
           # Smart sensor cannot have on retry callback
           self.on_retry_callback = None
   
           # Smart sensor cannot have on failure callback
           self.on_failure_callback = None
   
           # Smart sensor cannot mark the task as SKIPPED on failure
           self.soft_fail = False
           return super().is_smart_sensor_compatible()
   ```
   <!--
   How often does this problem occur? Once? Every time etc?
   Any relevant logs to include? Put them here inside fenced
   ``` ``` blocks or inside a foldable details tag if it's long:
   <details><summary>x.log</summary> lots of stuff </details>
   -->
   
   **Are you willing to submit a PR?**
   
   <!---
   This is absolutely not required, but we are happy to guide you in contribution process
   especially if you already have a good understanding of how to implement the fix.
   Airflow is a community-managed project and we love to bring new contributors in.
   Find us in #airflow-how-to-pr on Slack!
    -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andrewgodwin commented on issue #17834: Python Sensor cannot be turned into a Smart Sensor

Posted by GitBox <gi...@apache.org>.
andrewgodwin commented on issue #17834:
URL: https://github.com/apache/airflow/issues/17834#issuecomment-906526678


   The problem is that the `wait` bit would need to be in a Trigger - the sensor/operator is not called at all during the deferral process as it expects a lot of environmental setup that is quite expensive! We should add a nice example of what it takes to do this, though, it's really not very difficult.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on issue #17834: Python Sensor cannot be turned into a Smart Sensor

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on issue #17834:
URL: https://github.com/apache/airflow/issues/17834#issuecomment-906207958


   You are right we cannot do a "generic" python operator but maybe we could have some variant of AbstractAsyncPythonSensor (Abstract class). This could allow you to define a truly async Python operator. I could imagine API like that (pseudo-code):
   
   
   ```
   class AbstractAsyncPythonSensor(Sensor):
       def __init__(self, poke_method: Callable):
             ....
   
       async def wait():
             raise NotImplemeted()
   ```
   
   
   Then you could define your own SpecificAsyncPythonSensor - where wait() method would be an async method doing await internally on somehing in triggerer but then the poke method would be actually executed  in worker after the task is started.
   
   I think that could be the closest it can get to async/awaitable Python generic operator/sensor :)
   
   WDYT? It is a bit of an improvement over having to always write a new class. If you have potentially different sensor logic (but all sensors being awoken on the same kind of stuff (for example an SQS message coming), that might be a nice tool.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #17834: Python Sensor cannot be turned into a Smart Sensor

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #17834:
URL: https://github.com/apache/airflow/issues/17834#issuecomment-905831938


   It's not supported because it has "callable" which cannot be serialised.
   
   But FEAR NOUGHT ! @andrewgodwin is working on implementing AIP-40 (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177050929) which is a different (and much more comprehensive) approach to this problem (and there - as far as I understand it - Any operator (including Python Operator) will be possible to be run in "async" mode.
   
   The bad thing - it is going to be available in Airflow 2.2 at the earliest.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on issue #17834: Python Sensor cannot be turned into a Smart Sensor

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on issue #17834:
URL: https://github.com/apache/airflow/issues/17834#issuecomment-906207958


   You are right we cannot do a "generic" python operator but maybe we could have some variant of AbstractAsyncPythonSensor (Abstract class). This could allow you to define a truly async Python operator. I could imagine API like that (pseudo-code):
   
   
   ```
   class AbstractAsyncPythonSensor(Sensor):
       def __init__(self, poke_method: Callable):
             ....
   
       async def wait():
             raise NotImplemeted()
   ```
   
   
   Then you could define your own SpecificAsyncPythonSensor - where wait() method would be an async method doing await internally on somehing in triggerer but then the poke method would be actually executed  in worker after the task is started.
   
   I think that could be the closest it can get to async/awaitable Python generic operator :)
   
   WDYT? It is a bit of an improvement over having to always write a new class. If you have potentially different sensor logic (but all sensors being awoken on the same kind of stuff (for example an SQS message coming), that might be a nice tool.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #17834: Python Sensor cannot be turned into a Smart Sensor

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #17834:
URL: https://github.com/apache/airflow/issues/17834#issuecomment-906207958


   You are right we cannot do a "generic" python operator but maybe we could have some variant of AbstractAsyncPythonSensor (Abstract class). This could allow you to define a truly async Python operator. I could imagine API like that (pseudo-code):
   
   
   ```
   AbstractAsyncPythonSensor(Sensor):
       __init__(self, poke_method: Callable):
             ....
   
       async def wait():
             raise NotImplemeted()
   ```
   
   
   Then you could define your own SpecificAsyncPythonSensor - where wait() method would be an async method doing await internally on somehing in triggerer but then the poke method would be actually executed  in worker after the task is started.
   
   I think that could be the closest it can get to async/awaitable Python generic operator :)
   
   WDYT? It is a bit of an improvement over having to always write a new class. If you have potentially different sensor logic (but all sensors being awoken on the same kind of stuff (for example an SQS message coming), that might be a nice tool.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #17834: Python Sensor cannot be turned into a Smart Sensor

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #17834:
URL: https://github.com/apache/airflow/issues/17834#issuecomment-905751259


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andrewgodwin commented on issue #17834: Python Sensor cannot be turned into a Smart Sensor

Posted by GitBox <gi...@apache.org>.
andrewgodwin commented on issue #17834:
URL: https://github.com/apache/airflow/issues/17834#issuecomment-906001148


   It's not _quite_ that flexible sadly - Operators must have an async version written along with an accompanying trigger, and since the trigger runs on a different process it still suffers the same serialisation problem.
   
   I doubt we'll ever be able to make any generic operator (PythonOperator counting as the "most generic" of all operators) run well at scale unless we change the Operator contract to bake-in multi-tenancy and serializability from the get-go, which seems... unlikely. Deferrable Operators (AIP-40), and Smart Sensors to a similar extent, require that the operator that's being made "more efficient" have a central "delay portion" that can be done from anywhere - for example, waiting on an external system.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed issue #17834: Python Sensor cannot be turned into a Smart Sensor

Posted by GitBox <gi...@apache.org>.
potiuk closed issue #17834:
URL: https://github.com/apache/airflow/issues/17834


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on issue #17834: Python Sensor cannot be turned into a Smart Sensor

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on issue #17834:
URL: https://github.com/apache/airflow/issues/17834#issuecomment-906207958


   You are right we cannot do a "generic" python operator but maybe we could have some variant of AbstractAsyncPythonSensor (Abstract class). This could allow you to define a truly async Python operator. I could imagine API like that (pseudo-code):
   
   
   ```
   class AbstractAsyncPythonSensor(Sensor):
       __init__(self, poke_method: Callable):
             ....
   
       async def wait():
             raise NotImplemeted()
   ```
   
   
   Then you could define your own SpecificAsyncPythonSensor - where wait() method would be an async method doing await internally on somehing in triggerer but then the poke method would be actually executed  in worker after the task is started.
   
   I think that could be the closest it can get to async/awaitable Python generic operator :)
   
   WDYT? It is a bit of an improvement over having to always write a new class. If you have potentially different sensor logic (but all sensors being awoken on the same kind of stuff (for example an SQS message coming), that might be a nice tool.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org