You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/04/30 18:09:30 UTC

[GitHub] [airflow] jonathanshir opened a new pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

jonathanshir opened a new pull request #8651:
URL: https://github.com/apache/airflow/pull/8651


   This PR is a part of [AIP-31], and closes #8058 
   The goal of this issue is to detach the execution context from the executing method's signature.
   Before, the only way to retrieve execution context was via `**kwargs` in the function's signature.
   Now, it is possible to retrieve it using a simple external function call (only works when function is within execute context).
   This develops AIP-31 to allow a more functional way of writing code, without being coupled to airflow's current implementation.
   Collaborated with: @turbaszek @casassg @evgenyshulman 
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8651: [AIRFLOW-8058] Add configurable execution context

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-739776080


   Hello - Is this something we really need to release in 2.0.0rc1 ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jonathanshir commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
jonathanshir commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r421368510



##########
File path: tests/task/context/test_additional_execute_contextmanager.py
##########
@@ -0,0 +1,134 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import os
+import sys
+from datetime import datetime, timedelta
+from unittest import mock
+from unittest.mock import MagicMock, Mock
+
+import pytest
+
+from airflow import DAG, configuration
+from airflow.exceptions import AirflowException
+from airflow.models import TaskInstance
+from airflow.models.baseoperator import BaseOperator
+
+DEFAULT_ARGS = {
+    "owner": "test",
+    "depends_on_past": True,
+    "start_date": datetime.utcnow(),

Review comment:
       Alternative suggestion that still runs tests once?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r421604020



##########
File path: docs/howto/use-additional-execute-contextmanager.rst
##########
@@ -0,0 +1,47 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+Defining Additional Execute Context Manager
+===========================================
+
+Creating new context manager
+----------------------------
+
+Users can create their own execution context manager to allow context management on a higher level.
+To do so, one must define a new context manager in one of their files:
+
+    .. code-block:: python
+
+      import contextlib
+
+
+      @contextlib.contextmanager
+      def example_user_context_manager(task_instance, execution_context):
+          """
+          An example of a context manager that a user may provide.
+          """
+          in_context = True
+          try:
+              yield in_context
+          finally:
+              in_context = False

Review comment:
       Can we make this example more detailed? For example with authenticating or checking something before/after execution? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r419128640



##########
File path: tests/task/context/test_current_context.py
##########
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+
+import pytest
+
+from airflow import DAG
+from airflow.exceptions import AirflowException
+from airflow.models.baseoperator import BaseOperator
+from airflow.operators.python import PythonOperator
+from airflow.task.context.current import get_current_context, set_current_context
+
+DEFAULT_ARGS = {
+    "owner": "test",
+    "depends_on_past": True,
+    "start_date": datetime(2020, 4, 22),
+    "retries": 1,
+    "retry_delay": timedelta(minutes=1),
+}
+
+
+class TestCurrentContext:
+    def test_current_context_no_context_raise(self):
+        with pytest.raises(AirflowException):
+            get_current_context()
+
+    def test_current_context_roundtrip(self):
+        example_context = {"Hello": "World"}
+
+        with set_current_context(example_context):
+            assert get_current_context() == example_context
+
+    def test_context_removed_after_exit(self):
+        example_context = {"Hello": "World"}
+
+        with set_current_context(example_context):
+            pass
+        with pytest.raises(AirflowException, ):
+            get_current_context()
+
+    def test_nested_context(self):
+        """
+        Nested execution context can occur in a few cases:
+        1. User uses nested ctx managers.
+        2. User is executing subdags and subruns

Review comment:
       This is not true for Airflow 2.0. In Airflow 2.0, each task is executed by a separate local task job.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jonathanshir commented on pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
jonathanshir commented on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-624005459


   > I don't really understand it yet. Could you provide a practical example?
   
   So let's start with what exists today to give users more visibility and control into airflow's flow:
   The easiest way to interject into airflow's compile-time code: "Task Policy".
   In `airflow.settings.policy` you can see that the user can define his own policy to be called for every task object inside every executing DAG. This gives the user the option to alter task objects during compile time.
   Keeping this in mind, currently there is no real way to alter airflow variables during execution.
   Our use case in databand.ai is basically validating and running checks on every single task before it is executed, without changing every single airflow operator!
   It is important to mention that I think some confusion may have arose from the "bad name" in the configuration:
   This feature only allows you to define a contextmanager that is called before a task is executed. It **does not** alter airflow's context in any way, it only allows the user to do so. We have seen no clear use case for "replacing" airflow's context, so this is only an **addition**, hence will be renaming the feature to `additional_execute_contextmanager` instead of `user_defined_execute_context`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] evgenyshulman edited a comment on pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
evgenyshulman edited a comment on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-624172030


   continuing on this discussion:  actually you have a really great usecase for this feature - https://github.com/apache/airflow/pull/8432
   this feature will enable providing GCP context in the runtime without change every and every operator ( right now you have change Bash and Python, but what about all others? )
   
   so useful example for contextmanager can be something like this ( it can be part of airflow library, or just defined by the user in place accordingly to his requirements)
   ```
   @contextlib.contextmanager
   def gcp_context(task_instance , context):
       if  not ( "some condition on config that provide what gcp project to use ( maybe conf.get('gcp', 'project_id', None)..)"):
           yield None 
           return
       gcp_gcp_delegate_to = ...
       gcp_conn_id = ...
       with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
           try:
               from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
   
           except ImportError:
               raise AirflowException(
                   'Additional packages gcp are not installed. Please install it to use gcp_conn_id '
                   'parameter.'
                   'For more information, please look at: '
                   f'{DOC_BASE_URL}/installation' )
           with GoogleBaseHook(gcp_conn_id=gcp_conn_id, delegate_to=gcp_gcp_delegate_to).provide_authorized_gcloud() as gcp_context :
                       yield gcp_context  # will not be in use
          
   ```
   ( based on  https://github.com/apache/airflow/pull/8432 implementation) 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r419010640



##########
File path: airflow/utils/example_user_context_manager.py
##########
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+
+
+@contextlib.contextmanager
+def example_user_context_manager(task_instance, execution_context):  # pylint: disable=W0613
+    """
+    An example of a context manager that a user may provide.
+    This implementation does nothing, and is set in place to ensure
+    that user_defined_execute_context always has a correct value
+    """
+    in_context = True
+    try:
+        yield in_context
+    finally:
+        in_context = False

Review comment:
       If it is not needed, let's move this to Docs or some other place




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] evgenyshulman edited a comment on pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
evgenyshulman edited a comment on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-624172030


   continuing on this discussion:  actually you have a really great usecase for this feature - https://github.com/apache/airflow/pull/8432
   this feature will enable to provide GCP context in the runtime without change every and every operator ( right now you have change Bash and Python, but what about all others? )
   
   so useful example for contextmanager can be something like this ( it can be part of airflow library, or just defined by the user in place accordingly to his requirements)
   ```
   @contextlib.contextmanager
   def gcp_context(task_instance , context):
       if  not ( "some condition on config that provide what gcp project to use ( maybe conf.get('gcp', 'project_id', None)..)"):
           yield None 
           return
       gcp_gcp_delegate_to = ...
       gcp_conn_id = ...
       with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
           try:
               from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
   
           except ImportError:
               raise AirflowException(
                   'Additional packages gcp are not installed. Please install it to use gcp_conn_id '
                   'parameter.'
                   'For more information, please look at: '
                   f'{DOC_BASE_URL}/installation' )
           with GoogleBaseHook(gcp_conn_id=gcp_conn_id, delegate_to=gcp_gcp_delegate_to).provide_authorized_gcloud() as gcp_context :
                       yield gcp_context  # will not be in use
          
   ```
   ( based on  https://github.com/apache/airflow/pull/8432 implementation) 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jonathanshir edited a comment on pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
jonathanshir edited a comment on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-624005459


   @BasPH 
   > I don't really understand it yet. Could you provide a practical example?
   
   So let's start with what exists today to give users more visibility and control into airflow's flow:
   The easiest way to interject into airflow's compile-time code: "Task Policy".
   In `airflow.settings.policy` you can see that the user can define his own policy to be called for every task object inside every executing DAG. This gives the user the option to alter task objects during compile time.
   Keeping this in mind, currently there is no real way to alter airflow variables during execution.
   Our use case in databand.ai is basically validating and running checks on every single task before it is executed, without changing every single airflow operator!
   @mik-laj 
   It is important to mention that I think some confusion may have arose from the "bad name" in the configuration:
   This feature only allows you to define a contextmanager that is called before a task is executed. It **does not** alter airflow's context in any way, it only allows the user to do so. We have seen no clear use case for "replacing" airflow's context, so this is only an **addition**, hence will be renaming the feature to `additional_execute_contextmanager` instead of `user_defined_execute_context`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r421500411



##########
File path: airflow/config_templates/config.yml
##########
@@ -366,6 +366,17 @@
       type: string
       example: "path.to.CustomXCom"
       default: "airflow.models.xcom.BaseXCom"
+    - name: additional_execute_contextmanager
+      description: |
+        Custom user function that returns a context manager. Syntax is "package.method".
+        Context is entered when operator starts executing task. __enter__() will be called
+        before the operator's execute method, and __exit__() shortly after.
+        Function's signature should accept two positional parameters - task instance
+        and execution context
+      version_added: 2.0.0
+      type: string
+      example: ~

Review comment:
       No it won't, that is just to show an example in https://airflow.apache.org/docs/stable/configurations-ref.html
   
   It would break if you add that to `default:` key




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jonathanshir commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
jonathanshir commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r420133626



##########
File path: airflow/config_templates/config.yml
##########
@@ -366,6 +366,17 @@
       type: string
       example: "path.to.CustomXCom"
       default: "airflow.models.xcom.BaseXCom"
+    - name: user_defined_execute_context

Review comment:
       See my explanation below, I think the name "user_defined_execute_context" has misled you :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] evgenyshulman edited a comment on pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
evgenyshulman edited a comment on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-624172030


   continuing on this discussion:  actually you have a really great usecase for this feature - https://github.com/apache/airflow/pull/8432
   this feature will enable to provide GCP context in the runtime without change every and every operator ( right now you have change Bash and Python, but what about all others? )
   
   so useful example for contextmanager can be something like this ( it can be part of airflow library, or just defined by the user in place accordingly to his requirements)
   ```
   @contextlib.contextmanager
   def gcp_context(context):
       if  not ( "some condition on config that provide what gcp project to use ( maybe conf.get('gcp', 'project_id', None)..)"):
           yield None 
           return
       gcp_gcp_delegate_to = ...
       gcp_conn_id = ...
       with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
           try:
               from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
   
           except ImportError:
               raise AirflowException(
                   'Additional packages gcp are not installed. Please install it to use gcp_conn_id '
                   'parameter.'
                   'For more information, please look at: '
                   f'{DOC_BASE_URL}/installation' )
           with GoogleBaseHook(gcp_conn_id=gcp_conn_id, delegate_to=gcp_gcp_delegate_to).provide_authorized_gcloud() as gcp_context :
                       yield gcp_context  # will not be in use
          
   ```
   ( based on  https://github.com/apache/airflow/pull/8432 implementation) 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r432954063



##########
File path: docs/howto/use-additional-execute-contextmanager.rst
##########
@@ -0,0 +1,68 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+Defining Additional Execute Context Manager
+===========================================
+
+Creating new context manager
+----------------------------
+
+Users can create their own execution context manager to allow context management on a higher level.
+To do so, one must define a new context manager in one of their files. This context manager is entered
+before calling ``execute`` method and is exited shortly after it. Here is an example context manager
+which provides authentication to Google Cloud Platform:
+
+    .. code-block:: python
+
+      import os
+      import subprocess
+      from contextlib import contextmanager
+      from tempfile import TemporaryDirectory
+      from unittest import mock
+      from google.auth.environment_vars import CLOUD_SDK_CONFIG_DIR
+      from airflow.providers.google.cloud.utils.credentials_provider import provide_gcp_conn_and_credentials
+
+      def execute_cmd(cmd):
+          with open(os.devnull, 'w') as dev_null:
+            return subprocess.call(args=cmd, stdout=dev_null, stderr=subprocess.STDOUT)
+
+      @contextmanager
+      def provide_gcp_context(task_instance, execution_context):

Review comment:
       Does execution_context already contain task_instance?  It would be nice if the user could expect a common signature for many methods - pre_execute, on_execute, pre_execute, etc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] evgenyshulman commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
evgenyshulman commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r440000478



##########
File path: docs/howto/use-additional-execute-contextmanager.rst
##########
@@ -0,0 +1,68 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+Defining Additional Execute Context Manager
+===========================================
+
+Creating new context manager
+----------------------------
+
+Users can create their own execution context manager to allow context management on a higher level.
+To do so, one must define a new context manager in one of their files. This context manager is entered
+before calling ``execute`` method and is exited shortly after it. Here is an example context manager
+which provides authentication to Google Cloud Platform:
+
+    .. code-block:: python
+
+      import os
+      import subprocess
+      from contextlib import contextmanager
+      from tempfile import TemporaryDirectory
+      from unittest import mock
+      from google.auth.environment_vars import CLOUD_SDK_CONFIG_DIR
+      from airflow.providers.google.cloud.utils.credentials_provider import provide_gcp_conn_and_credentials
+
+      def execute_cmd(cmd):
+          with open(os.devnull, 'w') as dev_null:
+            return subprocess.call(args=cmd, stdout=dev_null, stderr=subprocess.STDOUT)
+
+      @contextmanager
+      def provide_gcp_context(task_instance, execution_context):

Review comment:
       pre_execute - good point, we will fix it in the next commit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r422653688



##########
File path: airflow/models/taskinstance.py
##########
@@ -1110,6 +1116,27 @@ def signal_handler(signum, frame):
             session.merge(self)
         session.commit()
 
+    def get_additional_execution_contextmanager(self, execution_context):
+        """
+        Retrieves the user defined execution context callback from the configuration,
+        and validates that it is indeed a context manager
+
+        :param execution_context: the current execution context to be passed to user ctx
+        """
+        additional_execution_contextmanager = conf.getimport("core", "additional_execute_contextmanager")
+        if additional_execution_contextmanager:
+            try:
+                user_ctx_obj = additional_execution_contextmanager(self, execution_context)
+                if hasattr(user_ctx_obj, "__enter__") and hasattr(user_ctx_obj, "__exit__"):
+                    return user_ctx_obj
+                else:
+                    raise AirflowException(f"Loaded function {additional_execution_contextmanager} "
+                                           f"as additional execution contextmanager, but it does not have "
+                                           f"__enter__ or __exit__ method!")

Review comment:
       Good!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jonathanshir edited a comment on pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
jonathanshir edited a comment on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-624005459


   @BasPH 
   > I don't really understand it yet. Could you provide a practical example?
   
   So let's start with what exists today to give users more visibility and control into airflow's flow:
   The easiest way to interject into airflow's compile-time code: "Task Policy".
   In `airflow.settings.policy` you can see that the user can define his own policy to be called for every task object inside every executing DAG. This gives the user the option to alter task objects during compile time.
   Keeping this in mind, currently there is no real way to alter airflow variables during execution.
   Our use case in databand.ai is basically validating and running checks on every single task before it is executed, without changing every single airflow operator!
   It is important to mention that I think some confusion may have arose from the "bad name" in the configuration:
   This feature only allows you to define a contextmanager that is called before a task is executed. It **does not** alter airflow's context in any way, it only allows the user to do so. We have seen no clear use case for "replacing" airflow's context, so this is only an **addition**, hence will be renaming the feature to `additional_execute_contextmanager` instead of `user_defined_execute_context`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r419011282



##########
File path: tests/task/context/test_current_context.py
##########
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+
+import pytest
+
+from airflow import DAG
+from airflow.exceptions import AirflowException
+from airflow.models.baseoperator import BaseOperator
+from airflow.operators.python import PythonOperator
+from airflow.task.context.current import get_current_context, set_current_context
+
+DEFAULT_ARGS = {
+    "owner": "test",
+    "depends_on_past": True,
+    "start_date": datetime(2020, 4, 22),
+    "retries": 1,
+    "retry_delay": timedelta(minutes=1),
+}
+
+
+class TestCurrentContext:
+    def test_current_context_no_context_raise(self):
+        with pytest.raises(AirflowException):
+            get_current_context()
+
+    def test_current_context_roundtrip(self):
+        example_context = {"Hello": "World"}
+
+        with set_current_context(example_context):
+            assert get_current_context() == example_context
+
+    def test_context_removed_after_exit(self):
+        example_context = {"Hello": "World"}
+
+        with set_current_context(example_context):
+            pass
+        with pytest.raises(AirflowException, ):
+            get_current_context()
+
+    def test_nested_context(self):

Review comment:
       Can you add the example of nested context in the Docstring here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jonathanshir commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
jonathanshir commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r422634752



##########
File path: airflow/models/taskinstance.py
##########
@@ -1110,6 +1116,27 @@ def signal_handler(signum, frame):
             session.merge(self)
         session.commit()
 
+    def get_additional_execution_contextmanager(self, execution_context):
+        """
+        Retrieves the user defined execution context callback from the configuration,
+        and validates that it is indeed a context manager
+
+        :param execution_context: the current execution context to be passed to user ctx
+        """
+        additional_execution_contextmanager = conf.getimport("core", "additional_execute_contextmanager")
+        if additional_execution_contextmanager:
+            try:
+                user_ctx_obj = additional_execution_contextmanager(self, execution_context)
+                if hasattr(user_ctx_obj, "__enter__") and hasattr(user_ctx_obj, "__exit__"):
+                    return user_ctx_obj
+                else:
+                    raise AirflowException(f"Loaded function {additional_execution_contextmanager} "
+                                           f"as additional execution contextmanager, but it does not have "
+                                           f"__enter__ or __exit__ method!")

Review comment:
       The tests verify that this works for decorated contextmanagers as well :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jonathanshir commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
jonathanshir commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r422634854



##########
File path: docs/howto/use-additional-execute-contextmanager.rst
##########
@@ -0,0 +1,47 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+Defining Additional Execute Context Manager
+===========================================
+
+Creating new context manager
+----------------------------
+
+Users can create their own execution context manager to allow context management on a higher level.
+To do so, one must define a new context manager in one of their files:
+
+    .. code-block:: python
+
+      import contextlib
+
+
+      @contextlib.contextmanager
+      def example_user_context_manager(task_instance, execution_context):
+          """
+          An example of a context manager that a user may provide.
+          """
+          in_context = True
+          try:
+              yield in_context
+          finally:
+              in_context = False

Review comment:
       We honestly should incorporate Evgeny's example into this somehow, open for ideas




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r419010171



##########
File path: airflow/task/context/current.py
##########
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import logging
+from typing import Any, Dict
+
+from airflow.exceptions import AirflowException
+
+_CURRENT_CONTEXT = []
+log = logging.getLogger(__name__)
+
+
+@contextlib.contextmanager
+def set_current_context(context):
+    """
+    Sets the current execution context to the provided context object.
+    This method should be called once per execution, before calling operator.execute

Review comment:
       ```suggestion
       Sets the current execution context to the provided context object.
       This method should be called once per Task execution, before calling operator.execute
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jonathanshir commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
jonathanshir commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r420133277



##########
File path: tests/task/context/test_current_context.py
##########
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+
+import pytest
+
+from airflow import DAG
+from airflow.exceptions import AirflowException
+from airflow.models.baseoperator import BaseOperator
+from airflow.operators.python import PythonOperator
+from airflow.task.context.current import get_current_context, set_current_context
+
+DEFAULT_ARGS = {
+    "owner": "test",
+    "depends_on_past": True,
+    "start_date": datetime(2020, 4, 22),
+    "retries": 1,
+    "retry_delay": timedelta(minutes=1),
+}
+
+
+class TestCurrentContext:
+    def test_current_context_no_context_raise(self):
+        with pytest.raises(AirflowException):
+            get_current_context()
+
+    def test_current_context_roundtrip(self):
+        example_context = {"Hello": "World"}
+
+        with set_current_context(example_context):
+            assert get_current_context() == example_context
+
+    def test_context_removed_after_exit(self):
+        example_context = {"Hello": "World"}
+
+        with set_current_context(example_context):
+            pass
+        with pytest.raises(AirflowException, ):
+            get_current_context()
+
+    def test_nested_context(self):
+        """
+        Nested execution context can occur in a few cases:
+        1. User uses nested ctx managers.
+        2. User is executing subdags and subruns

Review comment:
       Updated comment. Even without the subdag case I think this is a good test to ensure context management works as expected




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] BasPH commented on pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
BasPH commented on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-622065975


   I don't really understand it yet. Could you provide a practical example?
   
   Plus, please don't place code in `utils` unless absolutely necessary. I'm sure we can think of a better named folder :-)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #8651: [AIRFLOW-8058] Add configurable execution context

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-739776080


   Hello - Is this something we really need to release in 2.0.0rc1 ?
   
   If not - can someone set the right milestone please :)? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r422988794



##########
File path: docs/howto/use-additional-execute-contextmanager.rst
##########
@@ -28,19 +28,18 @@ To do so, one must define a new context manager in one of their files:
 
     .. code-block:: python
 
-      import contextlib
-
+      import os, contextlib
+      from test.test_utils.gcp_system_helpers import provide_gcp_context

Review comment:
       ```suggestion
         from tests.test_utils.gcp_system_helpers import provide_gcp_context
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r421460346



##########
File path: tests/task/context/test_additional_execute_contextmanager.py
##########
@@ -0,0 +1,134 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import os
+import sys
+from datetime import datetime, timedelta
+from unittest import mock
+from unittest.mock import MagicMock, Mock
+
+import pytest
+
+from airflow import DAG, configuration
+from airflow.exceptions import AirflowException
+from airflow.models import TaskInstance
+from airflow.models.baseoperator import BaseOperator
+
+DEFAULT_ARGS = {
+    "owner": "test",
+    "depends_on_past": True,
+    "start_date": datetime.utcnow(),

Review comment:
       Schedule interval as `@once`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8651: [AIRFLOW-8058] Add configurable execution context

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-854395276


   > Does anyone still work on it?
   
   I'm afraid not, would you like to pick it up? I think the case for this change was not that strong at that time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb closed pull request #8651: [AIRFLOW-8058] Add configurable execution context

Posted by GitBox <gi...@apache.org>.
ashb closed pull request #8651:
URL: https://github.com/apache/airflow/pull/8651


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r422989025



##########
File path: docs/howto/use-additional-execute-contextmanager.rst
##########
@@ -28,19 +28,18 @@ To do so, one must define a new context manager in one of their files:
 
     .. code-block:: python
 
-      import contextlib
-
+      import os, contextlib

Review comment:
       ```suggestion
         import contextlib
         import os
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r421157338



##########
File path: airflow/task/context/current.py
##########
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import logging
+from typing import Any, Dict
+
+from airflow.exceptions import AirflowException
+
+_CURRENT_CONTEXT = []
+log = logging.getLogger(__name__)
+
+
+@contextlib.contextmanager
+def set_current_context(context):

Review comment:
       Can we add TypeHint for `context`

##########
File path: airflow/config_templates/config.yml
##########
@@ -366,6 +366,17 @@
       type: string
       example: "path.to.CustomXCom"
       default: "airflow.models.xcom.BaseXCom"
+    - name: additional_execute_contextmanager
+      description: |
+        Custom user function that returns a context manager. Syntax is "package.method".
+        Context is entered when operator starts executing task. __enter__() will be called
+        before the operator's execute method, and __exit__() shortly after.
+        Function's signature should accept two positional parameters - task instance
+        and execution context

Review comment:
       ```suggestion
           Context is entered when operator starts executing task. ``__enter__()`` will be called
           before the operator's execute method, and ``__exit__()`` shortly after.
           Function's signature should accept two positional parameters - task instance
           and execution context
   ```
   
   I think you can add what it is equivalent to, to make it clear:
   
   ```python
   @my_context_manager
   def execute(context):
       ...
   ```

##########
File path: airflow/models/taskinstance.py
##########
@@ -1110,6 +1116,26 @@ def signal_handler(signum, frame):
             session.merge(self)
         session.commit()
 
+    def get_additional_execution_contextmanager(self, execution_context):
+        """
+        Retrieves the user defined execution context callback from the configuration,
+        and validates that it is indeed a context manager
+        :param execution_context: the current execution context to be passed to user ctx

Review comment:
       A blank line is needed for correct Sphinx doc rendering
   
   ```suggestion
           and validates that it is indeed a context manager
   
           :param execution_context: the current execution context to be passed to user ctx
   ```

##########
File path: docs/howto/use-additional-execute-contextmanager.rst
##########
@@ -0,0 +1,47 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+Defining Additional Execute Context Manager
+===============================

Review comment:
       ```suggestion
   Defining Additional Execute Context Manager
   ===========================================
   ```

##########
File path: docs/howto/use-additional-execute-contextmanager.rst
##########
@@ -0,0 +1,47 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+Defining Additional Execute Context Manager
+===============================
+
+Creating new context manager
+----------------------------
+
+Users can create their own exeuction context manager to allow context management on a higher level.

Review comment:
       ```suggestion
   Users can create their own execution context manager to allow context management on a higher level.
   ```

##########
File path: docs/howto/use-additional-execute-contextmanager.rst
##########
@@ -0,0 +1,47 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+Defining Additional Execute Context Manager
+===============================
+
+Creating new context manager
+----------------------------
+
+Users can create their own exeuction context manager to allow context management on a higher level.
+To do so, one must define a new context manager in one of their files:
+
+    .. code-block:: python
+
+      import contextlib
+
+
+      @contextlib.contextmanager
+      def example_user_context_manager(task_instance, execution_context):
+          """
+          An example of a context manager that a user may provide.
+          """
+          in_context = True
+          try:
+              yield in_context
+          finally:
+              in_context = False
+
+Notice the context manager's signature, it receives two parameters:
+1. task_instance - the executing task instance object (can also be retrieved from execution context via ``"ti"`` key.

Review comment:
       ```suggestion
   1. ``task_instance`` - the executing task instance object (can also be retrieved from execution context via ``"ti"`` key.
   ```

##########
File path: docs/howto/use-additional-execute-contextmanager.rst
##########
@@ -0,0 +1,47 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+Defining Additional Execute Context Manager
+===============================
+
+Creating new context manager
+----------------------------
+
+Users can create their own exeuction context manager to allow context management on a higher level.
+To do so, one must define a new context manager in one of their files:
+
+    .. code-block:: python
+
+      import contextlib
+
+
+      @contextlib.contextmanager
+      def example_user_context_manager(task_instance, execution_context):
+          """
+          An example of a context manager that a user may provide.
+          """
+          in_context = True
+          try:
+              yield in_context
+          finally:
+              in_context = False
+
+Notice the context manager's signature, it receives two parameters:
+1. task_instance - the executing task instance object (can also be retrieved from execution context via ``"ti"`` key.
+2. execution_context - the execution context that is provided to an operator's ``execute`` function.

Review comment:
       ```suggestion
   2. ``execution_context`` - the execution context that is provided to an operator's ``execute`` function.
   ```

##########
File path: airflow/config_templates/config.yml
##########
@@ -366,6 +366,17 @@
       type: string
       example: "path.to.CustomXCom"
       default: "airflow.models.xcom.BaseXCom"
+    - name: additional_execute_contextmanager
+      description: |
+        Custom user function that returns a context manager. Syntax is "package.method".
+        Context is entered when operator starts executing task. __enter__() will be called
+        before the operator's execute method, and __exit__() shortly after.
+        Function's signature should accept two positional parameters - task instance
+        and execution context

Review comment:
       if not here, we should add it to `docs/howto/use-additional-execute-contextmanager.rst`

##########
File path: tests/task/context/test_additional_execute_contextmanager.py
##########
@@ -0,0 +1,134 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import os
+import sys
+from datetime import datetime, timedelta
+from unittest import mock
+from unittest.mock import MagicMock, Mock
+
+import pytest
+
+from airflow import DAG, configuration
+from airflow.exceptions import AirflowException
+from airflow.models import TaskInstance
+from airflow.models.baseoperator import BaseOperator
+
+DEFAULT_ARGS = {
+    "owner": "test",
+    "depends_on_past": True,
+    "start_date": datetime.utcnow(),

Review comment:
       Using `utcnow()` is discouraged

##########
File path: tests/task/context/test_current_context.py
##########
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+
+import pytest
+
+from airflow import DAG
+from airflow.exceptions import AirflowException
+from airflow.models.baseoperator import BaseOperator
+from airflow.operators.python import PythonOperator
+from airflow.task.context.current import get_current_context, set_current_context
+
+DEFAULT_ARGS = {
+    "owner": "test",
+    "depends_on_past": True,
+    "start_date": datetime.utcnow(),

Review comment:
       same here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r421602145



##########
File path: airflow/models/taskinstance.py
##########
@@ -1110,6 +1116,27 @@ def signal_handler(signum, frame):
             session.merge(self)
         session.commit()
 
+    def get_additional_execution_contextmanager(self, execution_context):
+        """
+        Retrieves the user defined execution context callback from the configuration,
+        and validates that it is indeed a context manager
+
+        :param execution_context: the current execution context to be passed to user ctx
+        """
+        additional_execution_contextmanager = conf.getimport("core", "additional_execute_contextmanager")
+        if additional_execution_contextmanager:
+            try:
+                user_ctx_obj = additional_execution_contextmanager(self, execution_context)
+                if hasattr(user_ctx_obj, "__enter__") and hasattr(user_ctx_obj, "__exit__"):
+                    return user_ctx_obj
+                else:
+                    raise AirflowException(f"Loaded function {additional_execution_contextmanager} "
+                                           f"as additional execution contextmanager, but it does not have "
+                                           f"__enter__ or __exit__ method!")

Review comment:
       What about `@contextlib.contextmanager`? I think this check works only for context managers defined as a class 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] evgenyshulman commented on pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
evgenyshulman commented on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-624172030


   continuing on this discussion:  actually you have a really great usecase for this feature - https://github.com/apache/airflow/pull/8432
   this feature will enable to provide GCP context in the runtime without change every and every operator ( right now you have change Bash and Python, but what about all others? )
   
   so useful example for contextmanager can be something like this ( it can be part of airflow library, or just defined by the user in place accordingly to his requirements)
   ```@contextlib.contextmanager
   def gcp_context(context):
       if  not ( "some condition on config that provide what gcp project to use ( maybe conf.get('gcp', 'project_id', None)..)"):
           yield None 
           return
       gcp_gcp_delegate_to = ...
       gcp_conn_id = ...
       with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
           try:
               from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
   
           except ImportError:
               raise AirflowException(
                   'Additional packages gcp are not installed. Please install it to use gcp_conn_id '
                   'parameter.'
                   'For more information, please look at: '
                   f'{DOC_BASE_URL}/installation' )
           with GoogleBaseHook(gcp_conn_id=gcp_conn_id, delegate_to=gcp_gcp_delegate_to).provide_authorized_gcloud() as gcp_context :
                       yield gcp_context  # will not be in use
          
   ```
   ( based on  https://github.com/apache/airflow/pull/8432 implementation) 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r421461363



##########
File path: airflow/config_templates/config.yml
##########
@@ -366,6 +366,17 @@
       type: string
       example: "path.to.CustomXCom"
       default: "airflow.models.xcom.BaseXCom"
+    - name: additional_execute_contextmanager
+      description: |
+        Custom user function that returns a context manager. Syntax is "package.method".
+        Context is entered when operator starts executing task. __enter__() will be called
+        before the operator's execute method, and __exit__() shortly after.
+        Function's signature should accept two positional parameters - task instance
+        and execution context
+      version_added: 2.0.0
+      type: string
+      example: ~

Review comment:
       ```suggestion
         example: my.path. my_context_manager
   ```

##########
File path: airflow/config_templates/config.yml
##########
@@ -366,6 +366,17 @@
       type: string
       example: "path.to.CustomXCom"
       default: "airflow.models.xcom.BaseXCom"
+    - name: additional_execute_contextmanager
+      description: |
+        Custom user function that returns a context manager. Syntax is "package.method".
+        Context is entered when operator starts executing task. __enter__() will be called
+        before the operator's execute method, and __exit__() shortly after.
+        Function's signature should accept two positional parameters - task instance
+        and execution context
+      version_added: 2.0.0
+      type: string
+      example: ~

Review comment:
       ```suggestion
         example: my.path.my_context_manager
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] evgenyshulman commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
evgenyshulman commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r440000032



##########
File path: docs/howto/use-additional-execute-contextmanager.rst
##########
@@ -0,0 +1,68 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+Defining Additional Execute Context Manager
+===========================================
+
+Creating new context manager
+----------------------------
+
+Users can create their own execution context manager to allow context management on a higher level.
+To do so, one must define a new context manager in one of their files. This context manager is entered
+before calling ``execute`` method and is exited shortly after it. Here is an example context manager
+which provides authentication to Google Cloud Platform:
+
+    .. code-block:: python
+
+      import os
+      import subprocess
+      from contextlib import contextmanager
+      from tempfile import TemporaryDirectory
+      from unittest import mock
+      from google.auth.environment_vars import CLOUD_SDK_CONFIG_DIR
+      from airflow.providers.google.cloud.utils.credentials_provider import provide_gcp_conn_and_credentials
+
+      def execute_cmd(cmd):
+          with open(os.devnull, 'w') as dev_null:
+            return subprocess.call(args=cmd, stdout=dev_null, stderr=subprocess.STDOUT)
+
+      @contextmanager
+      def provide_gcp_context(task_instance, execution_context):

Review comment:
       > @evgenyshulman My solution allows for two use cases - task context, global context) Your proposal allows you to complete only one use case - global context. Is there any reason to keep this context as a global option? Did I miss something? I am not sure which solution is the best and I would like to understand both solutions.
   
   I am in favor of keeping the solution simple. The only concern is that current policy implementation requires airflow_local_settings that I don't see in use by most of the airflow cases. also in our use case, we need it on a global level so this is why we have done it this way. Do you see that you'll use it as a policy right after we implement it via policy? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8651: [AIRFLOW-8058] Add configurable execution context

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-679023399


   Let's keep it open for a moment


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r419009982



##########
File path: airflow/models/taskinstance.py
##########
@@ -1009,19 +1010,25 @@ def signal_handler(signum, frame):
                     self.log.error("Failed when executing execute callback")
                     self.log.exception(e3)
 
-                # If a timeout is specified for the task, make it fail
-                # if it goes beyond
-                result = None
-                if task_copy.execution_timeout:
+                with ExitStack() as exit_stack:
+                    from airflow.task.context.current import set_current_context
                     try:
-                        with timeout(int(
-                                task_copy.execution_timeout.total_seconds())):
-                            result = task_copy.execute(context=context)
+                        exit_stack.enter_context(set_current_context(context))
+                        user_defined_exec_context = self.get_user_defined_execute_context(context)
+                        if user_defined_exec_context is not None:
+                            exit_stack.enter_context(user_defined_exec_context)
+
+                        if task_copy.execution_timeout:
+                            # If a timeout is specified for the task, make it fail
+                            # if it goes beyond
+                            timeout_secs = int(
+                                task_copy.execution_timeout.total_seconds())

Review comment:
       ```suggestion
                               timeout_secs = int(task_copy.execution_timeout.total_seconds())
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jonathanshir commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
jonathanshir commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r421371079



##########
File path: airflow/config_templates/config.yml
##########
@@ -366,6 +366,17 @@
       type: string
       example: "path.to.CustomXCom"
       default: "airflow.models.xcom.BaseXCom"
+    - name: additional_execute_contextmanager
+      description: |
+        Custom user function that returns a context manager. Syntax is "package.method".
+        Context is entered when operator starts executing task. __enter__() will be called
+        before the operator's execute method, and __exit__() shortly after.
+        Function's signature should accept two positional parameters - task instance
+        and execution context

Review comment:
       I'm not sure that clears things up -- personally it took me a few minutes to understand myself how this is equivalent.
   I think the simplistic explanation is more understandable than the code example.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jonathanshir commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
jonathanshir commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r421367824



##########
File path: airflow/task/context/current.py
##########
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import logging
+from typing import Any, Dict
+
+from airflow.exceptions import AirflowException
+
+_CURRENT_CONTEXT = []
+log = logging.getLogger(__name__)
+
+
+@contextlib.contextmanager
+def set_current_context(context):

Review comment:
       @turbaszek Assuming that we will be leaving the ExecutionContext class outside, I'll add the TypeHinting




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r419128100



##########
File path: airflow/models/taskinstance.py
##########
@@ -1110,6 +1116,27 @@ def signal_handler(signum, frame):
             session.merge(self)
         session.commit()
 
+    def get_user_defined_execute_context(self, execution_context):
+        """
+        Retrieves the user defined execution context callback from the configuration,
+        and validates that it is indeed a context manager
+        :param execution_context: the current execution context to be passed to user ctx
+        """
+        path_to_user_context = conf.get("core", "user_defined_execute_context")

Review comment:
       This condition is not needed. getimport returns None when the option value is empty. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r419129507



##########
File path: airflow/config_templates/config.yml
##########
@@ -366,6 +366,17 @@
       type: string
       example: "path.to.CustomXCom"
       default: "airflow.models.xcom.BaseXCom"
+    - name: user_defined_execute_context

Review comment:
       I am not sure if the user should be able to overwrite this context manager. Can you provide use cases when necessary? 
   
   I recently thought a lot about a similar mechanism to be able to define a context that will allow authorization to [gcloud](https://github.com/apache/airflow/pull/8432), or [forward ports via SSH](https://github.com/apache/airflow/issues/8615). However, I am not convinced that this should be defined at the global level.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r419128640



##########
File path: tests/task/context/test_current_context.py
##########
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+
+import pytest
+
+from airflow import DAG
+from airflow.exceptions import AirflowException
+from airflow.models.baseoperator import BaseOperator
+from airflow.operators.python import PythonOperator
+from airflow.task.context.current import get_current_context, set_current_context
+
+DEFAULT_ARGS = {
+    "owner": "test",
+    "depends_on_past": True,
+    "start_date": datetime(2020, 4, 22),
+    "retries": 1,
+    "retry_delay": timedelta(minutes=1),
+}
+
+
+class TestCurrentContext:
+    def test_current_context_no_context_raise(self):
+        with pytest.raises(AirflowException):
+            get_current_context()
+
+    def test_current_context_roundtrip(self):
+        example_context = {"Hello": "World"}
+
+        with set_current_context(example_context):
+            assert get_current_context() == example_context
+
+    def test_context_removed_after_exit(self):
+        example_context = {"Hello": "World"}
+
+        with set_current_context(example_context):
+            pass
+        with pytest.raises(AirflowException, ):
+            get_current_context()
+
+    def test_nested_context(self):
+        """
+        Nested execution context can occur in a few cases:
+        1. User uses nested ctx managers.
+        2. User is executing subdags and subruns

Review comment:
       This is not true for Airflow 2.0. In Airflow 2.0, each task is executed by a separate worker.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r419128190



##########
File path: airflow/models/taskinstance.py
##########
@@ -1110,6 +1116,27 @@ def signal_handler(signum, frame):
             session.merge(self)
         session.commit()
 
+    def get_user_defined_execute_context(self, execution_context):
+        """
+        Retrieves the user defined execution context callback from the configuration,
+        and validates that it is indeed a context manager
+        :param execution_context: the current execution context to be passed to user ctx
+        """
+        path_to_user_context = conf.get("core", "user_defined_execute_context")
+        if path_to_user_context:
+            try:
+                imported = conf.getimport("core", "user_defined_execute_context")
+                user_ctx_obj = imported(self, execution_context)

Review comment:
       Can you add a check here if the result of the getimport is None?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-632079527


   @kaxil @mik-laj what are your thoughts? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] stale[bot] commented on pull request #8651: [AIRFLOW-8058] Add configurable execution context

Posted by GitBox <gi...@apache.org>.
stale[bot] commented on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-678690392


   This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r421460690



##########
File path: airflow/config_templates/config.yml
##########
@@ -366,6 +366,17 @@
       type: string
       example: "path.to.CustomXCom"
       default: "airflow.models.xcom.BaseXCom"
+    - name: additional_execute_contextmanager
+      description: |
+        Custom user function that returns a context manager. Syntax is "package.method".
+        Context is entered when operator starts executing task. __enter__() will be called
+        before the operator's execute method, and __exit__() shortly after.
+        Function's signature should accept two positional parameters - task instance
+        and execution context

Review comment:
       Sure, it might just be me




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] evgenyshulman commented on pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
evgenyshulman commented on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-633530927


   > I still don't understand why the additional execution context is configured as a global option. In my opinion, it is worth each task could have its own separate contexts. In other words, the additional context should be a task parameter, not an option in the global file. This will allow the tasks to have a different context and then use SSH tunnel or gcloud authorization. Such contexts will be able to be parameterized and e.g. one task will be able to use SSH tunnnel to connect to server A, and another task will be able to connect to server B
   > 
   > If someone wants one global context, it will also be possible by defining the cluster policy.
   > https://airflow.readthedocs.io/en/latest/concepts.html#cluster-policy
   > 
   > I propose that the context be defined at the task level.
   > 
   > The example context with parameters will look like this.
   > 
   > ```python
   > def ssh_tunnel(local_port, remote_port):
   > 	@contextmanager
   > 	def ssh_tunnel_context():
   > 		from sshtunnel import SSHTunnelForwarder
   > 
   > 		server = SSHTunnelForwarder(
   > 		    'pahaz.urfuclub.ru',
   > 		    ssh_username="pahaz",
   > 		    ssh_password="secret",
   > 		    remote_bind_address=('127.0.0.1', remote_port),
   > 		    local_bind_address=('127.0.0.1', local_port)
   > 		)
   > 		try:
   > 			server.start()
   > 			yield
   > 		finally:
   > 			server.stop()
   > 	return ssh_tunnel_context
   > ```
   > 
   > The example task will look like this.
   > 
   > ```python
   > task_a = MySQLExecuteQueryOperator(
   > 	task_id='execute_query',
   > 	execution_contexts=[
   > 		ssh_tunnel(3306, 3307),
   > 	]
   > )
   > ```
   > 
   > If you want to define a context that will be used by all operators, you can define the cluster policy as follows.
   > 
   > ```python
   > def my_task_policy(task):
   > 	task.execution_contexts.append(my_global_task_context())
   > ```
   > 
   > What do you think about my proposal? Will it meet your requirements?
   
   I would say it would be a nice option to be able to provide execution context per operator as well, however I assume that the idea is to be able to define a context that will affect all operators, instead of changing one by one. So sometimes a user will use context per operator (a new feature to implement), sometimes a user can just apply global context, so every operator will get GCP connection context regardless its type (or with some operator type check) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-633973505


   @evgenyshulman  My solution allows for two use cases. Your proposal allows you to complete only one use case. Is there any reason to keep this context as a global option? Did I miss something? I am not sure which solution is the best and I would like to understand both solution.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jonathanshir commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
jonathanshir commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r421592500



##########
File path: airflow/config_templates/config.yml
##########
@@ -366,6 +366,17 @@
       type: string
       example: "path.to.CustomXCom"
       default: "airflow.models.xcom.BaseXCom"
+    - name: additional_execute_contextmanager
+      description: |
+        Custom user function that returns a context manager. Syntax is "package.method".
+        Context is entered when operator starts executing task. __enter__() will be called
+        before the operator's execute method, and __exit__() shortly after.
+        Function's signature should accept two positional parameters - task instance
+        and execution context
+      version_added: 2.0.0
+      type: string
+      example: ~

Review comment:
       Sorry, I was wrong - example field instead of default field :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8651: [AIRFLOW-8058] Add configurable execution context

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-654299368


   Depends on #9631 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r419009784



##########
File path: airflow/models/taskinstance.py
##########
@@ -1110,6 +1117,22 @@ def signal_handler(signum, frame):
             session.merge(self)
         session.commit()
 
+    def get_user_defined_execute_context(self, execution_context):

Review comment:
       Please add docstrings




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sann05 commented on pull request #8651: [AIRFLOW-8058] Add configurable execution context

Posted by GitBox <gi...@apache.org>.
sann05 commented on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-848751529


   Does anyone still work on it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jonathanshir commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
jonathanshir commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r421496946



##########
File path: airflow/config_templates/config.yml
##########
@@ -366,6 +366,17 @@
       type: string
       example: "path.to.CustomXCom"
       default: "airflow.models.xcom.BaseXCom"
+    - name: additional_execute_contextmanager
+      description: |
+        Custom user function that returns a context manager. Syntax is "package.method".
+        Context is entered when operator starts executing task. __enter__() will be called
+        before the operator's execute method, and __exit__() shortly after.
+        Function's signature should accept two positional parameters - task instance
+        and execution context
+      version_added: 2.0.0
+      type: string
+      example: ~

Review comment:
       This will break any airflow run, it will try to access `my.path.my_context_manager` every time `execute` is called




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #8651: [AIRFLOW-8058] Add configurable execution context

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-858847804


   I'm afraid I'm going to close this as Wont Fix -- it feels very broad and fragile to use this global config setting for the example you've given (GCP project)
   
   This is also not necessary for the original target issue
   
   There is already a `on_execute_callback`, so perhaps a much simpler approach might be to add a matching `on_post_execute_callback` (PR) and then you can use https://airflow.apache.org/docs/apache-airflow/stable/concepts/cluster-policies.html?highlight=policy to set this globally in our install.
   
   Sorry to leave this PR mostly silent for most of a year to then close it, but we can build a simpler solution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj edited a comment on pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
mik-laj edited a comment on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-633973505


   @evgenyshulman  My solution allows for two use cases - task context, global context) Your proposal allows you to complete only one use case - global context. Is there any reason to keep this context as a global option? Did I miss something? I am not sure which solution is the best and I would like to understand both solutions.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8651: [AIRFLOW-8058] Add configurable execution context

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-854395276


   > Does anyone still work on it?
   
   I'm afraid not, would you like to pick it up? I think the case for this change was not that strong at that time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r419011155



##########
File path: tests/task/context/test_user_defined_execute_context.py
##########
@@ -0,0 +1,117 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import os
+import sys
+from datetime import datetime, timedelta
+from unittest.mock import Mock
+
+import pytest
+
+from airflow import DAG, configuration
+from airflow.exceptions import AirflowException
+from airflow.models import TaskInstance
+from airflow.models.baseoperator import BaseOperator
+
+DEFAULT_ARGS = {
+    "owner": "test",
+    "depends_on_past": True,
+    "start_date": datetime(2020, 4, 22),
+    "retries": 1,
+    "retry_delay": timedelta(minutes=1),
+}
+
+AFFECTED_VALUE = 0
+
+
+@contextlib.contextmanager
+def user_defined_context_func(task_instance, execution_context):  # pylint: disable=W0613
+    global AFFECTED_VALUE  # pylint: disable=W0603

Review comment:
       Why do we need a global value?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-632315068


   I still don't understand why the additional execution context is configured as a global option. In my opinion, it is worth each task could have its own separate contexts. In other words, the additional context should be a task parameter, not an option in the global file. This will allow the tasks to have a different context and then use SSH tunnel or gcloud authorization.  Such contexts will be able to be parameterized and e.g. one task will be able to use SSH tunnnel to connect to server A, and another task will be able to connect to server B
   
   If someone wants one global context, it will also be possible by defining the cluster policy.
   https://airflow.readthedocs.io/en/latest/concepts.html#cluster-policy
   
   I propose that the context be defined at the task level.
   
   The example context with parameters will look like this.
   ```python
   def ssh_tunnel(local_port, remote_port):
   	@contextmanager
   	def ssh_tunnel_context():
   		from sshtunnel import SSHTunnelForwarder
   
   		server = SSHTunnelForwarder(
   		    'pahaz.urfuclub.ru',
   		    ssh_username="pahaz",
   		    ssh_password="secret",
   		    remote_bind_address=('127.0.0.1', remote_port),
   		    local_bind_address=('127.0.0.1', local_port)
   		)
   		try:
   			server.start()
   			yield
   		finally:
   			server.stop()
   	return ssh_tunnel_context
   ```
   
   The example task will look like this.
   ```python
   task_a = MySQLExecuteQueryOperator(
   	task_id='execute_query',
   	execution_contexts=[
   		ssh_tunnel(3306, 3307),
   	]
   )
   ```
   
   If you want to define a context that will be used by all operators, you can define the cluster policy as follows.
   ```python
   def my_task_policy(task):
   	task.execution_contexts.append(my_global_task_context())
   ```
   
   What do you think about my proposal? Will it meet your requirements?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-633623365


   @evgenyshulman I think task-level context + cluster policy can provide the same functionality as the global context manager. In that way more situations would be addressed. WDYT?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jonathanshir edited a comment on pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

Posted by GitBox <gi...@apache.org>.
jonathanshir edited a comment on pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#issuecomment-624005459


   > I don't really understand it yet. Could you provide a practical example?
   @BasPH 
   So let's start with what exists today to give users more visibility and control into airflow's flow:
   The easiest way to interject into airflow's compile-time code: "Task Policy".
   In `airflow.settings.policy` you can see that the user can define his own policy to be called for every task object inside every executing DAG. This gives the user the option to alter task objects during compile time.
   Keeping this in mind, currently there is no real way to alter airflow variables during execution.
   Our use case in databand.ai is basically validating and running checks on every single task before it is executed, without changing every single airflow operator!
   It is important to mention that I think some confusion may have arose from the "bad name" in the configuration:
   This feature only allows you to define a contextmanager that is called before a task is executed. It **does not** alter airflow's context in any way, it only allows the user to do so. We have seen no clear use case for "replacing" airflow's context, so this is only an **addition**, hence will be renaming the feature to `additional_execute_contextmanager` instead of `user_defined_execute_context`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org