You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/06/18 20:49:36 UTC

[GitHub] [airflow] feluelle opened a new pull request #9394: Refactor PythonVirtualenvOperator

feluelle opened a new pull request #9394:
URL: https://github.com/apache/airflow/pull/9394


   This PR refactors the PythonVirtualenvOperator.
   
   But it also has a QoL improvement. You don't need to manually pass `dill` as requirement anymore if you have set `use_dill` to `True`.
   So it won't raise an Error if you forgot to add it.
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r452896934



##########
File path: airflow/operators/python.py
##########
@@ -413,140 +463,89 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
+
+    def execute(self, context: Dict):
+        serializable_context = {key: context[key] for key in self._get_serializable_context_keys()}
+        super().execute(context=serializable_context)
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            write_python_script(
+                jinja_context=dict(
+                    op_args=self.op_args,
+                    op_kwargs=self.op_kwargs,
+                    pickling_library=self.pickling_library.__name__,
+                    python_callable=self.python_callable.__name__,
+                    python_callable_source=dedent(inspect.getsource(self.python_callable))
+                ),
+                filename=script_filename
+            )
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _get_serializable_context_keys(self):
+        def _is_airflow_env():
+            return self.system_site_packages or 'apache-airflow' in self.requirements
+
+        def _is_pendulum_env():
+            return 'pendulum' in self.requirements and 'lazy_object_proxy' in self.requirements
+
+        serializable_context_keys = self.BASE_SERIALIZABLE_CONTEXT_KEYS.copy()
+        if _is_airflow_env():
+            serializable_context_keys.update(self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS)
+        if _is_pendulum_env() or _is_airflow_env():
+            serializable_context_keys.update(self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS)

Review comment:
       Oh I see. Will this be more readable?
   ```python
   if _is_airflow_env():
       serializable_context_keys.update(self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS)
       serializable_context_keys.update(self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS)
   elif _is_pendulum_env():
       serializable_context_keys.update(self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS)
   ```
   because if `_is_airflow_env()` is true then both updates are performed. The single update is done only when `_is_airflow_env() == False` and `_is_pendulum_env()==True`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r455751413



##########
File path: tests/operators/test_python.py
##########
@@ -1222,23 +1230,42 @@ def f(templates_dict):
         self._run_as_operator(f, templates_dict={'ds': '{{ ds }}'})
 
     def test_airflow_context(self):
-        def f(**context):
-            # not serializable
-            assert 'var' not in context
-            assert 'task_instance' not in context
-            assert 'ti' not in context
-            # require airflow
-            assert 'conf' in context
-            assert 'dag' in context
-            assert 'dag_run' in context
-            assert 'macros' in context
-            assert 'task' in context
-            # require airflow or pendulum
-            assert 'execution_date' in context
-            assert 'next_execution_date' in context
-            assert 'prev_execution_date' in context
-            assert 'prev_execution_date_success' in context
-            assert 'prev_start_date_success' in context
+        def f(
+            # basic
+            ds_nodash,
+            inlets,
+            next_ds,
+            next_ds_nodash,
+            outlets,
+            params,
+            prev_ds,
+            prev_ds_nodash,
+            run_id,
+            task_instance_key_str,
+            test_mode,
+            tomorrow_ds,
+            tomorrow_ds_nodash,
+            ts,
+            ts_nodash,
+            ts_nodash_with_tz,
+            yesterday_ds,
+            yesterday_ds_nodash,
+            # pendulum-specific
+            execution_date,
+            next_execution_date,
+            prev_execution_date,
+            prev_execution_date_success,
+            prev_start_date_success,
+            # airflow-specific
+            macros,
+            conf,
+            dag,
+            dag_run,
+            task,
+            # other
+            **context
+        ):  # pylint: disable=unused-argument,too-many-arguments,too-many-locals
+            pass

Review comment:
       What would work would be using jinja to create the tests :D but I think that is really too much.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r455748785



##########
File path: tests/operators/test_python.py
##########
@@ -1222,23 +1230,42 @@ def f(templates_dict):
         self._run_as_operator(f, templates_dict={'ds': '{{ ds }}'})
 
     def test_airflow_context(self):
-        def f(**context):
-            # not serializable
-            assert 'var' not in context
-            assert 'task_instance' not in context
-            assert 'ti' not in context
-            # require airflow
-            assert 'conf' in context
-            assert 'dag' in context
-            assert 'dag_run' in context
-            assert 'macros' in context
-            assert 'task' in context
-            # require airflow or pendulum
-            assert 'execution_date' in context
-            assert 'next_execution_date' in context
-            assert 'prev_execution_date' in context
-            assert 'prev_execution_date_success' in context
-            assert 'prev_start_date_success' in context
+        def f(
+            # basic
+            ds_nodash,
+            inlets,
+            next_ds,
+            next_ds_nodash,
+            outlets,
+            params,
+            prev_ds,
+            prev_ds_nodash,
+            run_id,
+            task_instance_key_str,
+            test_mode,
+            tomorrow_ds,
+            tomorrow_ds_nodash,
+            ts,
+            ts_nodash,
+            ts_nodash_with_tz,
+            yesterday_ds,
+            yesterday_ds_nodash,
+            # pendulum-specific
+            execution_date,
+            next_execution_date,
+            prev_execution_date,
+            prev_execution_date_success,
+            prev_start_date_success,
+            # airflow-specific
+            macros,
+            conf,
+            dag,
+            dag_run,
+            task,
+            # other
+            **context
+        ):  # pylint: disable=unused-argument,too-many-arguments,too-many-locals
+            pass

Review comment:
       Good point, but I think this would rather make it more complex.. of course we could save a few lines but.. as it is right now it is pretty straight forward to see what is going on. Using `parameterized.expand` would make it more complex which is not worth the lines we would save in my opinion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444039647



##########
File path: airflow/operators/python.py
##########
@@ -269,140 +281,132 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            self._write_script(script_filename)
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            if self.op_kwargs:
+                # some items from context can't be loaded in virtual env
+                self._keep_serializable_op_kwargs()
+            print(self.op_kwargs)
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _keep_serializable_op_kwargs(self):
+        # Remove unserializable objects
+        # otherwise "KeyError: 'Variable __getstate__ does not exist'" would be raised.
+        self.op_kwargs.pop('var', None)
+        # otherwise "TypeError: cannot serialize '_io.FileIO' object" would be raised.
+        self.op_kwargs.pop('task_instance', None)
+        self.op_kwargs.pop('ti', None)
+
+        if self.system_site_packages or 'apache-airflow' in self.requirements:
+            # All can be serialized expecting it to run in an airflow env.
+            return
+
+        # Not access to host packages and no apache-airflow installed.
+        # Remove airflow specific context
+        # otherwise "ModuleNotFoundError: No module named 'airflow'" would be raised.
+        self.op_kwargs.pop('macros', None)
+        self.op_kwargs.pop('conf', None)
+        self.op_kwargs.pop('dag', None)
+        self.op_kwargs.pop('dag_run', None)
+        self.op_kwargs.pop('task', None)
+
+        if 'pendulum' in self.requirements and 'lazy_object_proxy' in self.requirements:
+            # ..but pendulum is installed so keep pendulum date objects
+            # Note: 'lazy_object_proxy' is needed to work.
+            return
+
+        # No pendulum is installed either. So remove pendulum specific context.
+        # otherwise "ModuleNotFoundError: No module named 'pendulum'" would be raised.
+        self.op_kwargs.pop('execution_date', None)
+        self.op_kwargs.pop('next_execution_date', None)
+        self.op_kwargs.pop('prev_execution_date', None)
+        self.op_kwargs.pop('prev_execution_date_success', None)
+        self.op_kwargs.pop('prev_start_date_success', None)
 
     def _write_string_args(self, filename):
-        # writes string_args to a file, which are read line by line
         with open(filename, 'w') as file:
             file.write('\n'.join(map(str, self.string_args)))
 
-    def _write_args(self, input_filename):
-        # serialize args to file
-        if self._pass_op_args():
-            with open(input_filename, 'wb') as file:
-                arg_dict = ({'args': self.op_args, 'kwargs': self.op_kwargs})
-                if self.use_dill:
-                    dill.dump(arg_dict, file)
-                else:
-                    pickle.dump(arg_dict, file)
-
-    def _read_result(self, output_filename):
-        if os.stat(output_filename).st_size == 0:
+    def _code__imports(self):
+        return f'import {self.pickling_library.__name__}\n' \
+               'import sys\n'
+
+    def _code__read_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            return f'with open({filename}, "rb") as file:\n' \
+                   f'    arg_dict = {self.pickling_library.__name__}.load(file)\n'
+        return 'arg_dict = {"args": [], "kwargs": {}}\n'
+
+    def _code__read_string_args(self, filename):
+        return f'with open({filename}, "r") as file:\n' \
+               '    virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))\n'
+
+    def _code__write_output(self, filename):
+        return f'with open({filename}, "wb") as file:\n' \
+               f'    if res: {self.pickling_library.__name__}.dump(res, file)\n'
+
+    def _code__call_script(self, arg_dict):
+        return f'{dedent(inspect.getsource(self.python_callable))}\n' \
+               f'res = {self.python_callable.__name__}(*{arg_dict}["args"], **{arg_dict}["kwargs"])\n'
+
+    def _write_script(self, filename):
+        with open(filename, 'w') as file:
+            python_code = f"{self._code__imports()}" \
+                          f"{self._code__read_args(filename='sys.argv[1]')}" \
+                          f"{self._code__read_string_args(filename='sys.argv[3]')}" \
+                          f"{self._code__call_script(arg_dict='arg_dict')}" \
+                          f"{self._code__write_output(filename='sys.argv[2]')}"
+            self.log.debug('Writing code to file\n %s', python_code)
+            file.write(python_code)

Review comment:
       We render the template and save it to a file.
   https://github.com/PolideaInternal/airflow-munchkin/blob/2de8bff30b342f11e260dbfa8f7725c74731adda/airflow_munchkin/discovery_parser/renderers.py#L21-L25
   
   https://github.com/PolideaInternal/airflow-munchkin/blob/2de8bff30b342f11e260dbfa8f7725c74731adda/airflow_munchkin/block_renderer/template_utils.py#L28-L32
   
   I think being able to take a look at the whole structure of the file is quite helpful. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444023173



##########
File path: airflow/operators/python.py
##########
@@ -261,6 +265,14 @@ def __init__(  # pylint: disable=too-many-arguments
         *args,
         **kwargs
     ):
+        if (
+            not isinstance(python_callable, types.FunctionType) or
+            isinstance(python_callable, types.LambdaType) and python_callable.__name__ == "<lambda>"
+        ):
+            raise AirflowException('PythonVirtualenvOperator only supports functions for python_callable arg')
+        if python_version and str(python_version)[0] != str(sys.version_info[0]) and (op_args or op_kwargs):

Review comment:
       ```suggestion
           if python_version and str(python_version)[0] != sys.version_info.major and (op_args or op_kwargs):
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r452831967



##########
File path: airflow/operators/python.py
##########
@@ -413,140 +463,89 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
+
+    def execute(self, context: Dict):
+        serializable_context = {key: context[key] for key in self._get_serializable_context_keys()}
+        super().execute(context=serializable_context)
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            write_python_script(
+                jinja_context=dict(
+                    op_args=self.op_args,
+                    op_kwargs=self.op_kwargs,
+                    pickling_library=self.pickling_library.__name__,
+                    python_callable=self.python_callable.__name__,
+                    python_callable_source=dedent(inspect.getsource(self.python_callable))
+                ),
+                filename=script_filename
+            )
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _get_serializable_context_keys(self):
+        def _is_airflow_env():
+            return self.system_site_packages or 'apache-airflow' in self.requirements
+
+        def _is_pendulum_env():
+            return 'pendulum' in self.requirements and 'lazy_object_proxy' in self.requirements
+
+        serializable_context_keys = self.BASE_SERIALIZABLE_CONTEXT_KEYS.copy()
+        if _is_airflow_env():
+            serializable_context_keys.update(self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS)
+        if _is_pendulum_env() or _is_airflow_env():
+            serializable_context_keys.update(self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS)

Review comment:
       ```suggestion
   
           if _is_pendulum_env() or _is_airflow_env():
               serializable_context_keys.update(self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS)
               serializable_context_keys.update(self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS)
   ```
   Should give the same result?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: [WIP] Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r452774942



##########
File path: airflow/operators/python.py
##########
@@ -412,140 +427,113 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            self._write_script(script_filename)
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            if self.op_kwargs:
+                # some items from context can't be loaded in virtual env
+                self._keep_serializable_op_kwargs()
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _keep_serializable_op_kwargs(self):

Review comment:
       I think it is actually not that hard to do. :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r455737857



##########
File path: tests/operators/test_python.py
##########
@@ -1222,23 +1230,42 @@ def f(templates_dict):
         self._run_as_operator(f, templates_dict={'ds': '{{ ds }}'})
 
     def test_airflow_context(self):
-        def f(**context):
-            # not serializable
-            assert 'var' not in context
-            assert 'task_instance' not in context
-            assert 'ti' not in context
-            # require airflow
-            assert 'conf' in context
-            assert 'dag' in context
-            assert 'dag_run' in context
-            assert 'macros' in context
-            assert 'task' in context
-            # require airflow or pendulum
-            assert 'execution_date' in context
-            assert 'next_execution_date' in context
-            assert 'prev_execution_date' in context
-            assert 'prev_execution_date_success' in context
-            assert 'prev_start_date_success' in context
+        def f(
+            # basic
+            ds_nodash,
+            inlets,
+            next_ds,
+            next_ds_nodash,
+            outlets,
+            params,
+            prev_ds,
+            prev_ds_nodash,
+            run_id,
+            task_instance_key_str,
+            test_mode,
+            tomorrow_ds,
+            tomorrow_ds_nodash,
+            ts,
+            ts_nodash,
+            ts_nodash_with_tz,
+            yesterday_ds,
+            yesterday_ds_nodash,
+            # pendulum-specific
+            execution_date,
+            next_execution_date,
+            prev_execution_date,
+            prev_execution_date_success,
+            prev_start_date_success,
+            # airflow-specific
+            macros,
+            conf,
+            dag,
+            dag_run,
+            task,
+            # other
+            **context
+        ):  # pylint: disable=unused-argument,too-many-arguments,too-many-locals
+            pass

Review comment:
       Hm, would it be possible to use paramaterization of tetst?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444816320



##########
File path: airflow/utils/python_virtualenv_script.tpl
##########
@@ -0,0 +1,23 @@
+import {{ pickling_library }}
+import sys
+
+# Read args
+{% if op_args or op_kwargs %}
+with open(sys.argv[1], "rb") as file:
+    arg_dict = {{ pickling_library }}.load(file)
+{% else %}
+arg_dict = {"args": [], "kwargs": {}}
+{% endif %}
+
+# Read string args
+with open(sys.argv[3], "r") as file:
+    virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))
+
+# Script
+{{ python_callable_source }}
+res = {{ python_callable }}(*arg_dict["args"], **arg_dict["kwargs"])
+
+# Write output
+with open(sys.argv[2], "wb") as file:
+    if res:
+        {{ pickling_library }}.dump(res, file)

Review comment:
       Nice!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444036138



##########
File path: airflow/operators/python.py
##########
@@ -269,140 +281,132 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            self._write_script(script_filename)
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            if self.op_kwargs:
+                # some items from context can't be loaded in virtual env
+                self._keep_serializable_op_kwargs()
+            print(self.op_kwargs)
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _keep_serializable_op_kwargs(self):
+        # Remove unserializable objects
+        # otherwise "KeyError: 'Variable __getstate__ does not exist'" would be raised.
+        self.op_kwargs.pop('var', None)
+        # otherwise "TypeError: cannot serialize '_io.FileIO' object" would be raised.
+        self.op_kwargs.pop('task_instance', None)
+        self.op_kwargs.pop('ti', None)
+
+        if self.system_site_packages or 'apache-airflow' in self.requirements:
+            # All can be serialized expecting it to run in an airflow env.
+            return
+
+        # Not access to host packages and no apache-airflow installed.
+        # Remove airflow specific context
+        # otherwise "ModuleNotFoundError: No module named 'airflow'" would be raised.
+        self.op_kwargs.pop('macros', None)
+        self.op_kwargs.pop('conf', None)
+        self.op_kwargs.pop('dag', None)
+        self.op_kwargs.pop('dag_run', None)
+        self.op_kwargs.pop('task', None)
+
+        if 'pendulum' in self.requirements and 'lazy_object_proxy' in self.requirements:
+            # ..but pendulum is installed so keep pendulum date objects
+            # Note: 'lazy_object_proxy' is needed to work.
+            return
+
+        # No pendulum is installed either. So remove pendulum specific context.
+        # otherwise "ModuleNotFoundError: No module named 'pendulum'" would be raised.
+        self.op_kwargs.pop('execution_date', None)
+        self.op_kwargs.pop('next_execution_date', None)
+        self.op_kwargs.pop('prev_execution_date', None)
+        self.op_kwargs.pop('prev_execution_date_success', None)
+        self.op_kwargs.pop('prev_start_date_success', None)
 
     def _write_string_args(self, filename):
-        # writes string_args to a file, which are read line by line
         with open(filename, 'w') as file:
             file.write('\n'.join(map(str, self.string_args)))
 
-    def _write_args(self, input_filename):
-        # serialize args to file
-        if self._pass_op_args():
-            with open(input_filename, 'wb') as file:
-                arg_dict = ({'args': self.op_args, 'kwargs': self.op_kwargs})
-                if self.use_dill:
-                    dill.dump(arg_dict, file)
-                else:
-                    pickle.dump(arg_dict, file)
-
-    def _read_result(self, output_filename):
-        if os.stat(output_filename).st_size == 0:
+    def _code__imports(self):
+        return f'import {self.pickling_library.__name__}\n' \
+               'import sys\n'
+
+    def _code__read_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            return f'with open({filename}, "rb") as file:\n' \
+                   f'    arg_dict = {self.pickling_library.__name__}.load(file)\n'
+        return 'arg_dict = {"args": [], "kwargs": {}}\n'
+
+    def _code__read_string_args(self, filename):
+        return f'with open({filename}, "r") as file:\n' \
+               '    virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))\n'
+
+    def _code__write_output(self, filename):
+        return f'with open({filename}, "wb") as file:\n' \
+               f'    if res: {self.pickling_library.__name__}.dump(res, file)\n'
+
+    def _code__call_script(self, arg_dict):
+        return f'{dedent(inspect.getsource(self.python_callable))}\n' \
+               f'res = {self.python_callable.__name__}(*{arg_dict}["args"], **{arg_dict}["kwargs"])\n'
+
+    def _write_script(self, filename):
+        with open(filename, 'w') as file:
+            python_code = f"{self._code__imports()}" \
+                          f"{self._code__read_args(filename='sys.argv[1]')}" \
+                          f"{self._code__read_string_args(filename='sys.argv[3]')}" \
+                          f"{self._code__call_script(arg_dict='arg_dict')}" \
+                          f"{self._code__write_output(filename='sys.argv[2]')}"
+            self.log.debug('Writing code to file\n %s', python_code)
+            file.write(python_code)

Review comment:
       Looks good 👍 How do you use it then?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r452832600



##########
File path: airflow/utils/python_virtualenv_script.jinja2
##########
@@ -0,0 +1,42 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+#}
+
+import {{ pickling_library }}
+import sys
+
+# Read args
+{% if op_args or op_kwargs %}
+with open(sys.argv[1], "rb") as file:
+    arg_dict = {{ pickling_library }}.load(file)
+{% else %}
+arg_dict = {"args": [], "kwargs": {}}
+{% endif %}
+
+# Read string args
+with open(sys.argv[3], "r") as file:
+    virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))
+
+# Script
+{{ python_callable_source }}
+res = {{ python_callable }}(*arg_dict["args"], **arg_dict["kwargs"])
+
+# Write output
+with open(sys.argv[2], "wb") as file:
+    if res:
+        {{ pickling_library }}.dump(res, file)

Review comment:
       I like it!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r455750879



##########
File path: tests/operators/test_python.py
##########
@@ -1222,23 +1230,42 @@ def f(templates_dict):
         self._run_as_operator(f, templates_dict={'ds': '{{ ds }}'})
 
     def test_airflow_context(self):
-        def f(**context):
-            # not serializable
-            assert 'var' not in context
-            assert 'task_instance' not in context
-            assert 'ti' not in context
-            # require airflow
-            assert 'conf' in context
-            assert 'dag' in context
-            assert 'dag_run' in context
-            assert 'macros' in context
-            assert 'task' in context
-            # require airflow or pendulum
-            assert 'execution_date' in context
-            assert 'next_execution_date' in context
-            assert 'prev_execution_date' in context
-            assert 'prev_execution_date_success' in context
-            assert 'prev_start_date_success' in context
+        def f(
+            # basic
+            ds_nodash,
+            inlets,
+            next_ds,
+            next_ds_nodash,
+            outlets,
+            params,
+            prev_ds,
+            prev_ds_nodash,
+            run_id,
+            task_instance_key_str,
+            test_mode,
+            tomorrow_ds,
+            tomorrow_ds_nodash,
+            ts,
+            ts_nodash,
+            ts_nodash_with_tz,
+            yesterday_ds,
+            yesterday_ds_nodash,
+            # pendulum-specific
+            execution_date,
+            next_execution_date,
+            prev_execution_date,
+            prev_execution_date_success,
+            prev_start_date_success,
+            # airflow-specific
+            macros,
+            conf,
+            dag,
+            dag_run,
+            task,
+            # other
+            **context
+        ):  # pylint: disable=unused-argument,too-many-arguments,too-many-locals
+            pass

Review comment:
       Actually i think it isn't even **possible** because the function is executed in a different context so I cannot pass variables/parameters from outside into it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Fokko commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
Fokko commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r449418614



##########
File path: airflow/operators/python.py
##########
@@ -412,140 +427,113 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            self._write_script(script_filename)
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            if self.op_kwargs:
+                # some items from context can't be loaded in virtual env
+                self._keep_serializable_op_kwargs()
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _keep_serializable_op_kwargs(self):

Review comment:
       Now we're removing all kinds of keys, wouldn't it be more preferable to have an allowed-list for keys to keep? This might be more stable in the future. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r449449791



##########
File path: airflow/operators/python.py
##########
@@ -412,140 +427,113 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            self._write_script(script_filename)
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            if self.op_kwargs:
+                # some items from context can't be loaded in virtual env
+                self._keep_serializable_op_kwargs()
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _keep_serializable_op_kwargs(self):

Review comment:
       Good point. 🤔 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle merged pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle merged pull request #9394:
URL: https://github.com/apache/airflow/pull/9394


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444889415



##########
File path: airflow/operators/python.py
##########
@@ -412,140 +427,113 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            self._write_script(script_filename)
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            if self.op_kwargs:
+                # some items from context can't be loaded in virtual env
+                self._keep_serializable_op_kwargs()
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _keep_serializable_op_kwargs(self):
+        # Remove unserializable objects
+        # otherwise "KeyError: 'Variable __getstate__ does not exist'" would be raised.
+        self.op_kwargs.pop('var', None)
+        # otherwise "TypeError: cannot serialize '_io.FileIO' object" would be raised.
+        self.op_kwargs.pop('task_instance', None)
+        self.op_kwargs.pop('ti', None)
+
+        if self.system_site_packages or 'apache-airflow' in self.requirements:
+            # All can be serialized expecting it to run in an airflow env.
+            return
+
+        # Not access to host packages and no apache-airflow installed.
+        # Remove airflow specific context
+        # otherwise "ModuleNotFoundError: No module named 'airflow'" would be raised.
+        self.op_kwargs.pop('macros', None)
+        self.op_kwargs.pop('conf', None)
+        self.op_kwargs.pop('dag', None)
+        self.op_kwargs.pop('dag_run', None)
+        self.op_kwargs.pop('task', None)
+
+        if 'pendulum' in self.requirements and 'lazy_object_proxy' in self.requirements:
+            # ..but pendulum is installed so keep pendulum date objects
+            # Note: 'lazy_object_proxy' is needed to work.
+            return
+
+        # No pendulum is installed either. So remove pendulum specific context.
+        # otherwise "ModuleNotFoundError: No module named 'pendulum'" would be raised.
+        self.op_kwargs.pop('execution_date', None)
+        self.op_kwargs.pop('next_execution_date', None)
+        self.op_kwargs.pop('prev_execution_date', None)
+        self.op_kwargs.pop('prev_execution_date_success', None)
+        self.op_kwargs.pop('prev_start_date_success', None)
 
     def _write_string_args(self, filename):
-        # writes string_args to a file, which are read line by line
         with open(filename, 'w') as file:
             file.write('\n'.join(map(str, self.string_args)))
 
-    def _write_args(self, input_filename):
-        # serialize args to file
-        if self._pass_op_args():
-            with open(input_filename, 'wb') as file:
-                arg_dict = ({'args': self.op_args, 'kwargs': self.op_kwargs})
-                if self.use_dill:
-                    dill.dump(arg_dict, file)
-                else:
-                    pickle.dump(arg_dict, file)
-
-    def _read_result(self, output_filename):
-        if os.stat(output_filename).st_size == 0:
+    def _write_script(self, filename):
+        with open(filename, 'w') as file:
+            python_code = render_virtualenv_script(
+                jinja_context=dict(
+                    op_args=self.op_args,
+                    op_kwargs=self.op_kwargs,
+                    pickling_library=self.pickling_library.__name__,
+                    python_callable=self.python_callable.__name__,
+                    python_callable_source=dedent(inspect.getsource(self.python_callable))
+                )
+            )
+            self.log.debug('Writing code to file\n %s', python_code)

Review comment:
       I added `:` but the new line should be there I think.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r449451590



##########
File path: airflow/utils/python_virtualenv.py
##########
@@ -69,3 +72,23 @@ def prepare_virtualenv(
         execute_in_subprocess(pip_cmd)
 
     return '{}/bin/python'.format(venv_directory)
+
+
+def render_virtualenv_script(jinja_context: dict) -> str:
+    """
+    Renders the python script to execute in the virtual environment.
+
+    :param jinja_context: The jinja context variables to unpack and replace with its placeholders in the
+        template file.
+    :type jinja_context: dict
+    :return: the rendered content
+    :rtype: str
+    """
+    template_loader = jinja2.FileSystemLoader(searchpath=os.path.dirname(__file__))
+    template_env = jinja2.Environment(
+        loader=template_loader,
+        undefined=jinja2.StrictUndefined
+    )
+    template = template_env.get_template('python_virtualenv_script.jinja2')
+    content = template.render(**jinja_context)

Review comment:
       I could also use `template.stream(**jinja_context).dump(file)`. So I do not need to use `open` to create and write to a file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] maganaluis commented on pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
maganaluis commented on pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#issuecomment-666708165


   @feluelle Thank you for adding this fix, not sure if this change will work in the 1.10 branch? We forked the repo into GitHub Enterprise and added the fix there, I'm good with closing the issue and PR. I have interest to move to Airflow 2.0 because the REST API so I'm looking forward to it. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle merged pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle merged pull request #9394:
URL: https://github.com/apache/airflow/pull/9394


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: [WIP] Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r452768008



##########
File path: airflow/operators/python.py
##########
@@ -412,140 +427,113 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            self._write_script(script_filename)
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            if self.op_kwargs:
+                # some items from context can't be loaded in virtual env
+                self._keep_serializable_op_kwargs()
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _keep_serializable_op_kwargs(self):

Review comment:
       But the problem is that you can add custom `op_kwargs` which should also be serialized but you don't know how they are called in the first place. 😁 
   ```python
       def execute(self, context: Dict):
           context.update(self.op_kwargs)
           ...
           self.op_kwargs = PythonOperator.determine_op_kwargs(self.python_callable, context, len(self.op_args))
   
           return_value = self.execute_callable()
           ...
   ```
   The `PythonVirtualenvOperator` can only access `self.op_kwargs` not `context` directly which contains those unserializable objects.
   I would have to overwrite the `execute` so that I can change that.
   
   WDYT? @Fokko 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: [WIP] Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r452768008



##########
File path: airflow/operators/python.py
##########
@@ -412,140 +427,113 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            self._write_script(script_filename)
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            if self.op_kwargs:
+                # some items from context can't be loaded in virtual env
+                self._keep_serializable_op_kwargs()
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _keep_serializable_op_kwargs(self):

Review comment:
       But the problem is that you can add custom `op_kwargs` which should also be serialized but you don't know how they are called in the first place. 😁 
   From the `PythonOperator` (base class of `PythonVirtualenvOperator`)
   ```python
       def execute(self, context: Dict):
           context.update(self.op_kwargs)
           ...
           self.op_kwargs = PythonOperator.determine_op_kwargs(self.python_callable, context, len(self.op_args))
   
           return_value = self.execute_callable()
           ...
   ```
   The `PythonVirtualenvOperator` can only access `self.op_kwargs` not `context` directly which contains those unserializable objects.
   I would have to overwrite the `execute` so that I can change that.
   
   WDYT? @Fokko 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r452891667



##########
File path: airflow/operators/python.py
##########
@@ -413,140 +463,89 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
+
+    def execute(self, context: Dict):
+        serializable_context = {key: context[key] for key in self._get_serializable_context_keys()}
+        super().execute(context=serializable_context)
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            write_python_script(
+                jinja_context=dict(
+                    op_args=self.op_args,
+                    op_kwargs=self.op_kwargs,
+                    pickling_library=self.pickling_library.__name__,
+                    python_callable=self.python_callable.__name__,
+                    python_callable_source=dedent(inspect.getsource(self.python_callable))
+                ),
+                filename=script_filename
+            )
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _get_serializable_context_keys(self):
+        def _is_airflow_env():
+            return self.system_site_packages or 'apache-airflow' in self.requirements
+
+        def _is_pendulum_env():
+            return 'pendulum' in self.requirements and 'lazy_object_proxy' in self.requirements
+
+        serializable_context_keys = self.BASE_SERIALIZABLE_CONTEXT_KEYS.copy()
+        if _is_airflow_env():
+            serializable_context_keys.update(self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS)
+        if _is_pendulum_env() or _is_airflow_env():
+            serializable_context_keys.update(self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS)

Review comment:
       No that would mean if you have installed pendulum you can also use Airflow resources which is not correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r455726402



##########
File path: tests/operators/test_python.py
##########
@@ -1222,23 +1230,42 @@ def f(templates_dict):
         self._run_as_operator(f, templates_dict={'ds': '{{ ds }}'})
 
     def test_airflow_context(self):
-        def f(**context):
-            # not serializable
-            assert 'var' not in context
-            assert 'task_instance' not in context
-            assert 'ti' not in context
-            # require airflow
-            assert 'conf' in context
-            assert 'dag' in context
-            assert 'dag_run' in context
-            assert 'macros' in context
-            assert 'task' in context
-            # require airflow or pendulum
-            assert 'execution_date' in context
-            assert 'next_execution_date' in context
-            assert 'prev_execution_date' in context
-            assert 'prev_execution_date_success' in context
-            assert 'prev_start_date_success' in context
+        def f(
+            # basic
+            ds_nodash,
+            inlets,
+            next_ds,
+            next_ds_nodash,
+            outlets,
+            params,
+            prev_ds,
+            prev_ds_nodash,
+            run_id,
+            task_instance_key_str,
+            test_mode,
+            tomorrow_ds,
+            tomorrow_ds_nodash,
+            ts,
+            ts_nodash,
+            ts_nodash_with_tz,
+            yesterday_ds,
+            yesterday_ds_nodash,
+            # pendulum-specific
+            execution_date,
+            next_execution_date,
+            prev_execution_date,
+            prev_execution_date_success,
+            prev_start_date_success,
+            # airflow-specific
+            macros,
+            conf,
+            dag,
+            dag_run,
+            task,
+            # other
+            **context
+        ):  # pylint: disable=unused-argument,too-many-arguments,too-many-locals
+            pass

Review comment:
       > I am testing it by forcing to unpack the keys given.
   
   More like a system-test instead of unit-test now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#issuecomment-656639942


   Ready for another round @turbaszek @Fokko 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r447453937



##########
File path: airflow/operators/python.py
##########
@@ -354,14 +354,18 @@ class PythonVirtualenvOperator(PythonOperator):
     Note that if your virtualenv runs in a different Python major version than Airflow,
     you cannot use return values, op_args, or op_kwargs. You can use string_args though.
 
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:PythonVirtualenvOperator`
+
     :param python_callable: A python function with no references to outside variables,
         defined with def, which will be run in a virtualenv
     :type python_callable: function
     :param requirements: A list of requirements as specified in a pip install command
     :type requirements: list[str]
     :param python_version: The Python version to run the virtualenv with. Note that
         both 2 and 2.7 are acceptable forms.
-    :type python_version: str
+    :type python_version: str or int or float

Review comment:
       ```suggestion
       :type python_version: Union[str, int, float]
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444165384



##########
File path: airflow/operators/python.py
##########
@@ -269,140 +281,132 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            self._write_script(script_filename)
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            if self.op_kwargs:
+                # some items from context can't be loaded in virtual env
+                self._keep_serializable_op_kwargs()
+            print(self.op_kwargs)

Review comment:
       ToRemove




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444027117



##########
File path: airflow/operators/python.py
##########
@@ -269,140 +281,132 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            self._write_script(script_filename)
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            if self.op_kwargs:
+                # some items from context can't be loaded in virtual env
+                self._keep_serializable_op_kwargs()
+            print(self.op_kwargs)
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _keep_serializable_op_kwargs(self):
+        # Remove unserializable objects
+        # otherwise "KeyError: 'Variable __getstate__ does not exist'" would be raised.
+        self.op_kwargs.pop('var', None)
+        # otherwise "TypeError: cannot serialize '_io.FileIO' object" would be raised.
+        self.op_kwargs.pop('task_instance', None)
+        self.op_kwargs.pop('ti', None)
+
+        if self.system_site_packages or 'apache-airflow' in self.requirements:
+            # All can be serialized expecting it to run in an airflow env.
+            return
+
+        # Not access to host packages and no apache-airflow installed.
+        # Remove airflow specific context
+        # otherwise "ModuleNotFoundError: No module named 'airflow'" would be raised.
+        self.op_kwargs.pop('macros', None)
+        self.op_kwargs.pop('conf', None)
+        self.op_kwargs.pop('dag', None)
+        self.op_kwargs.pop('dag_run', None)
+        self.op_kwargs.pop('task', None)
+
+        if 'pendulum' in self.requirements and 'lazy_object_proxy' in self.requirements:
+            # ..but pendulum is installed so keep pendulum date objects
+            # Note: 'lazy_object_proxy' is needed to work.
+            return
+
+        # No pendulum is installed either. So remove pendulum specific context.
+        # otherwise "ModuleNotFoundError: No module named 'pendulum'" would be raised.
+        self.op_kwargs.pop('execution_date', None)
+        self.op_kwargs.pop('next_execution_date', None)
+        self.op_kwargs.pop('prev_execution_date', None)
+        self.op_kwargs.pop('prev_execution_date_success', None)
+        self.op_kwargs.pop('prev_start_date_success', None)
 
     def _write_string_args(self, filename):
-        # writes string_args to a file, which are read line by line
         with open(filename, 'w') as file:
             file.write('\n'.join(map(str, self.string_args)))
 
-    def _write_args(self, input_filename):
-        # serialize args to file
-        if self._pass_op_args():
-            with open(input_filename, 'wb') as file:
-                arg_dict = ({'args': self.op_args, 'kwargs': self.op_kwargs})
-                if self.use_dill:
-                    dill.dump(arg_dict, file)
-                else:
-                    pickle.dump(arg_dict, file)
-
-    def _read_result(self, output_filename):
-        if os.stat(output_filename).st_size == 0:
+    def _code__imports(self):
+        return f'import {self.pickling_library.__name__}\n' \
+               'import sys\n'
+
+    def _code__read_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            return f'with open({filename}, "rb") as file:\n' \
+                   f'    arg_dict = {self.pickling_library.__name__}.load(file)\n'
+        return 'arg_dict = {"args": [], "kwargs": {}}\n'
+
+    def _code__read_string_args(self, filename):
+        return f'with open({filename}, "r") as file:\n' \
+               '    virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))\n'
+
+    def _code__write_output(self, filename):
+        return f'with open({filename}, "wb") as file:\n' \
+               f'    if res: {self.pickling_library.__name__}.dump(res, file)\n'
+
+    def _code__call_script(self, arg_dict):
+        return f'{dedent(inspect.getsource(self.python_callable))}\n' \
+               f'res = {self.python_callable.__name__}(*{arg_dict}["args"], **{arg_dict}["kwargs"])\n'
+
+    def _write_script(self, filename):
+        with open(filename, 'w') as file:
+            python_code = f"{self._code__imports()}" \
+                          f"{self._code__read_args(filename='sys.argv[1]')}" \
+                          f"{self._code__read_string_args(filename='sys.argv[3]')}" \
+                          f"{self._code__call_script(arg_dict='arg_dict')}" \
+                          f"{self._code__write_output(filename='sys.argv[2]')}"
+            self.log.debug('Writing code to file\n %s', python_code)
+            file.write(python_code)

Review comment:
       @feluelle what would you say to use jinja template? I think it would be easier to understand how the `python_code` looks like. We used a similar approach in operator generator: 
   https://github.com/PolideaInternal/airflow-munchkin/blob/master/airflow_munchkin/template/discovery/operator_class.tpl




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444159472



##########
File path: airflow/operators/python.py
##########
@@ -269,140 +281,132 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            self._write_script(script_filename)
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            if self.op_kwargs:
+                # some items from context can't be loaded in virtual env
+                self._keep_serializable_op_kwargs()
+            print(self.op_kwargs)
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _keep_serializable_op_kwargs(self):
+        # Remove unserializable objects
+        # otherwise "KeyError: 'Variable __getstate__ does not exist'" would be raised.
+        self.op_kwargs.pop('var', None)
+        # otherwise "TypeError: cannot serialize '_io.FileIO' object" would be raised.
+        self.op_kwargs.pop('task_instance', None)
+        self.op_kwargs.pop('ti', None)
+
+        if self.system_site_packages or 'apache-airflow' in self.requirements:
+            # All can be serialized expecting it to run in an airflow env.
+            return
+
+        # Not access to host packages and no apache-airflow installed.
+        # Remove airflow specific context
+        # otherwise "ModuleNotFoundError: No module named 'airflow'" would be raised.
+        self.op_kwargs.pop('macros', None)
+        self.op_kwargs.pop('conf', None)
+        self.op_kwargs.pop('dag', None)
+        self.op_kwargs.pop('dag_run', None)
+        self.op_kwargs.pop('task', None)
+
+        if 'pendulum' in self.requirements and 'lazy_object_proxy' in self.requirements:
+            # ..but pendulum is installed so keep pendulum date objects
+            # Note: 'lazy_object_proxy' is needed to work.
+            return
+
+        # No pendulum is installed either. So remove pendulum specific context.
+        # otherwise "ModuleNotFoundError: No module named 'pendulum'" would be raised.
+        self.op_kwargs.pop('execution_date', None)
+        self.op_kwargs.pop('next_execution_date', None)
+        self.op_kwargs.pop('prev_execution_date', None)
+        self.op_kwargs.pop('prev_execution_date_success', None)
+        self.op_kwargs.pop('prev_start_date_success', None)
 
     def _write_string_args(self, filename):
-        # writes string_args to a file, which are read line by line
         with open(filename, 'w') as file:
             file.write('\n'.join(map(str, self.string_args)))
 
-    def _write_args(self, input_filename):
-        # serialize args to file
-        if self._pass_op_args():
-            with open(input_filename, 'wb') as file:
-                arg_dict = ({'args': self.op_args, 'kwargs': self.op_kwargs})
-                if self.use_dill:
-                    dill.dump(arg_dict, file)
-                else:
-                    pickle.dump(arg_dict, file)
-
-    def _read_result(self, output_filename):
-        if os.stat(output_filename).st_size == 0:
+    def _code__imports(self):
+        return f'import {self.pickling_library.__name__}\n' \
+               'import sys\n'
+
+    def _code__read_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            return f'with open({filename}, "rb") as file:\n' \
+                   f'    arg_dict = {self.pickling_library.__name__}.load(file)\n'
+        return 'arg_dict = {"args": [], "kwargs": {}}\n'
+
+    def _code__read_string_args(self, filename):
+        return f'with open({filename}, "r") as file:\n' \
+               '    virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))\n'
+
+    def _code__write_output(self, filename):
+        return f'with open({filename}, "wb") as file:\n' \
+               f'    if res: {self.pickling_library.__name__}.dump(res, file)\n'
+
+    def _code__call_script(self, arg_dict):
+        return f'{dedent(inspect.getsource(self.python_callable))}\n' \
+               f'res = {self.python_callable.__name__}(*{arg_dict}["args"], **{arg_dict}["kwargs"])\n'
+
+    def _write_script(self, filename):
+        with open(filename, 'w') as file:
+            python_code = f"{self._code__imports()}" \
+                          f"{self._code__read_args(filename='sys.argv[1]')}" \
+                          f"{self._code__read_string_args(filename='sys.argv[3]')}" \
+                          f"{self._code__call_script(arg_dict='arg_dict')}" \
+                          f"{self._code__write_output(filename='sys.argv[2]')}"
+            self.log.debug('Writing code to file\n %s', python_code)
+            file.write(python_code)

Review comment:
       Thanks - will take a look.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444816940



##########
File path: airflow/operators/python.py
##########
@@ -412,140 +427,113 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            self._write_script(script_filename)
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            if self.op_kwargs:
+                # some items from context can't be loaded in virtual env
+                self._keep_serializable_op_kwargs()
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _keep_serializable_op_kwargs(self):
+        # Remove unserializable objects
+        # otherwise "KeyError: 'Variable __getstate__ does not exist'" would be raised.
+        self.op_kwargs.pop('var', None)
+        # otherwise "TypeError: cannot serialize '_io.FileIO' object" would be raised.
+        self.op_kwargs.pop('task_instance', None)
+        self.op_kwargs.pop('ti', None)
+
+        if self.system_site_packages or 'apache-airflow' in self.requirements:
+            # All can be serialized expecting it to run in an airflow env.
+            return
+
+        # Not access to host packages and no apache-airflow installed.
+        # Remove airflow specific context
+        # otherwise "ModuleNotFoundError: No module named 'airflow'" would be raised.
+        self.op_kwargs.pop('macros', None)
+        self.op_kwargs.pop('conf', None)
+        self.op_kwargs.pop('dag', None)
+        self.op_kwargs.pop('dag_run', None)
+        self.op_kwargs.pop('task', None)
+
+        if 'pendulum' in self.requirements and 'lazy_object_proxy' in self.requirements:
+            # ..but pendulum is installed so keep pendulum date objects
+            # Note: 'lazy_object_proxy' is needed to work.
+            return
+
+        # No pendulum is installed either. So remove pendulum specific context.
+        # otherwise "ModuleNotFoundError: No module named 'pendulum'" would be raised.
+        self.op_kwargs.pop('execution_date', None)
+        self.op_kwargs.pop('next_execution_date', None)
+        self.op_kwargs.pop('prev_execution_date', None)
+        self.op_kwargs.pop('prev_execution_date_success', None)
+        self.op_kwargs.pop('prev_start_date_success', None)
 
     def _write_string_args(self, filename):
-        # writes string_args to a file, which are read line by line
         with open(filename, 'w') as file:
             file.write('\n'.join(map(str, self.string_args)))
 
-    def _write_args(self, input_filename):
-        # serialize args to file
-        if self._pass_op_args():
-            with open(input_filename, 'wb') as file:
-                arg_dict = ({'args': self.op_args, 'kwargs': self.op_kwargs})
-                if self.use_dill:
-                    dill.dump(arg_dict, file)
-                else:
-                    pickle.dump(arg_dict, file)
-
-    def _read_result(self, output_filename):
-        if os.stat(output_filename).st_size == 0:
+    def _write_script(self, filename):
+        with open(filename, 'w') as file:
+            python_code = render_virtualenv_script(
+                jinja_context=dict(
+                    op_args=self.op_args,
+                    op_kwargs=self.op_kwargs,
+                    pickling_library=self.pickling_library.__name__,
+                    python_callable=self.python_callable.__name__,
+                    python_callable_source=dedent(inspect.getsource(self.python_callable))
+                )
+            )
+            self.log.debug('Writing code to file\n %s', python_code)

Review comment:
       ```suggestion
               self.log.debug('Writing code to file: %s', python_code)
   ```
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#issuecomment-651304857


   PTAL @turbaszek @maganaluis @ashb @Fokko 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #9394: Refactor PythonVirtualenvOperator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#issuecomment-646519794


   @feluelle I am a bit busy with other changes. I'm afraid I may not find time to look at it.
   
   @Fokko Can you look at it? I know you are more familiar with this code.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r453293281



##########
File path: airflow/operators/python.py
##########
@@ -413,140 +463,89 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
+
+    def execute(self, context: Dict):
+        serializable_context = {key: context[key] for key in self._get_serializable_context_keys()}
+        super().execute(context=serializable_context)
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            write_python_script(
+                jinja_context=dict(
+                    op_args=self.op_args,
+                    op_kwargs=self.op_kwargs,
+                    pickling_library=self.pickling_library.__name__,
+                    python_callable=self.python_callable.__name__,
+                    python_callable_source=dedent(inspect.getsource(self.python_callable))
+                ),
+                filename=script_filename
+            )
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _get_serializable_context_keys(self):
+        def _is_airflow_env():
+            return self.system_site_packages or 'apache-airflow' in self.requirements
+
+        def _is_pendulum_env():
+            return 'pendulum' in self.requirements and 'lazy_object_proxy' in self.requirements
+
+        serializable_context_keys = self.BASE_SERIALIZABLE_CONTEXT_KEYS.copy()
+        if _is_airflow_env():
+            serializable_context_keys.update(self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS)
+        if _is_pendulum_env() or _is_airflow_env():
+            serializable_context_keys.update(self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS)

Review comment:
       I am not sure both LGTM. What do others think? @Fokko maybe. :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#issuecomment-665610177






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444811541



##########
File path: airflow/operators/python.py
##########
@@ -269,140 +281,132 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            self._write_script(script_filename)
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            if self.op_kwargs:
+                # some items from context can't be loaded in virtual env
+                self._keep_serializable_op_kwargs()
+            print(self.op_kwargs)
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _keep_serializable_op_kwargs(self):
+        # Remove unserializable objects
+        # otherwise "KeyError: 'Variable __getstate__ does not exist'" would be raised.
+        self.op_kwargs.pop('var', None)
+        # otherwise "TypeError: cannot serialize '_io.FileIO' object" would be raised.
+        self.op_kwargs.pop('task_instance', None)
+        self.op_kwargs.pop('ti', None)
+
+        if self.system_site_packages or 'apache-airflow' in self.requirements:
+            # All can be serialized expecting it to run in an airflow env.
+            return
+
+        # Not access to host packages and no apache-airflow installed.
+        # Remove airflow specific context
+        # otherwise "ModuleNotFoundError: No module named 'airflow'" would be raised.
+        self.op_kwargs.pop('macros', None)
+        self.op_kwargs.pop('conf', None)
+        self.op_kwargs.pop('dag', None)
+        self.op_kwargs.pop('dag_run', None)
+        self.op_kwargs.pop('task', None)
+
+        if 'pendulum' in self.requirements and 'lazy_object_proxy' in self.requirements:
+            # ..but pendulum is installed so keep pendulum date objects
+            # Note: 'lazy_object_proxy' is needed to work.
+            return
+
+        # No pendulum is installed either. So remove pendulum specific context.
+        # otherwise "ModuleNotFoundError: No module named 'pendulum'" would be raised.
+        self.op_kwargs.pop('execution_date', None)
+        self.op_kwargs.pop('next_execution_date', None)
+        self.op_kwargs.pop('prev_execution_date', None)
+        self.op_kwargs.pop('prev_execution_date_success', None)
+        self.op_kwargs.pop('prev_start_date_success', None)
 
     def _write_string_args(self, filename):
-        # writes string_args to a file, which are read line by line
         with open(filename, 'w') as file:
             file.write('\n'.join(map(str, self.string_args)))
 
-    def _write_args(self, input_filename):
-        # serialize args to file
-        if self._pass_op_args():
-            with open(input_filename, 'wb') as file:
-                arg_dict = ({'args': self.op_args, 'kwargs': self.op_kwargs})
-                if self.use_dill:
-                    dill.dump(arg_dict, file)
-                else:
-                    pickle.dump(arg_dict, file)
-
-    def _read_result(self, output_filename):
-        if os.stat(output_filename).st_size == 0:
+    def _code__imports(self):
+        return f'import {self.pickling_library.__name__}\n' \
+               'import sys\n'
+
+    def _code__read_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            return f'with open({filename}, "rb") as file:\n' \
+                   f'    arg_dict = {self.pickling_library.__name__}.load(file)\n'
+        return 'arg_dict = {"args": [], "kwargs": {}}\n'
+
+    def _code__read_string_args(self, filename):
+        return f'with open({filename}, "r") as file:\n' \
+               '    virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))\n'
+
+    def _code__write_output(self, filename):
+        return f'with open({filename}, "wb") as file:\n' \
+               f'    if res: {self.pickling_library.__name__}.dump(res, file)\n'
+
+    def _code__call_script(self, arg_dict):
+        return f'{dedent(inspect.getsource(self.python_callable))}\n' \
+               f'res = {self.python_callable.__name__}(*{arg_dict}["args"], **{arg_dict}["kwargs"])\n'
+
+    def _write_script(self, filename):
+        with open(filename, 'w') as file:
+            python_code = f"{self._code__imports()}" \
+                          f"{self._code__read_args(filename='sys.argv[1]')}" \
+                          f"{self._code__read_string_args(filename='sys.argv[3]')}" \
+                          f"{self._code__call_script(arg_dict='arg_dict')}" \
+                          f"{self._code__write_output(filename='sys.argv[2]')}"
+            self.log.debug('Writing code to file\n %s', python_code)
+            file.write(python_code)

Review comment:
       Done @turbaszek 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r455724510



##########
File path: tests/operators/test_python.py
##########
@@ -1222,23 +1230,42 @@ def f(templates_dict):
         self._run_as_operator(f, templates_dict={'ds': '{{ ds }}'})
 
     def test_airflow_context(self):
-        def f(**context):
-            # not serializable
-            assert 'var' not in context
-            assert 'task_instance' not in context
-            assert 'ti' not in context
-            # require airflow
-            assert 'conf' in context
-            assert 'dag' in context
-            assert 'dag_run' in context
-            assert 'macros' in context
-            assert 'task' in context
-            # require airflow or pendulum
-            assert 'execution_date' in context
-            assert 'next_execution_date' in context
-            assert 'prev_execution_date' in context
-            assert 'prev_execution_date_success' in context
-            assert 'prev_start_date_success' in context
+        def f(
+            # basic
+            ds_nodash,
+            inlets,
+            next_ds,
+            next_ds_nodash,
+            outlets,
+            params,
+            prev_ds,
+            prev_ds_nodash,
+            run_id,
+            task_instance_key_str,
+            test_mode,
+            tomorrow_ds,
+            tomorrow_ds_nodash,
+            ts,
+            ts_nodash,
+            ts_nodash_with_tz,
+            yesterday_ds,
+            yesterday_ds_nodash,
+            # pendulum-specific
+            execution_date,
+            next_execution_date,
+            prev_execution_date,
+            prev_execution_date_success,
+            prev_start_date_success,
+            # airflow-specific
+            macros,
+            conf,
+            dag,
+            dag_run,
+            task,
+            # other
+            **context
+        ):  # pylint: disable=unused-argument,too-many-arguments,too-many-locals
+            pass

Review comment:
       @turbaszek what do you think of this? I am testing it by forcing to unpack the keys given.

##########
File path: tests/operators/test_python.py
##########
@@ -1222,23 +1230,42 @@ def f(templates_dict):
         self._run_as_operator(f, templates_dict={'ds': '{{ ds }}'})
 
     def test_airflow_context(self):
-        def f(**context):
-            # not serializable
-            assert 'var' not in context
-            assert 'task_instance' not in context
-            assert 'ti' not in context
-            # require airflow
-            assert 'conf' in context
-            assert 'dag' in context
-            assert 'dag_run' in context
-            assert 'macros' in context
-            assert 'task' in context
-            # require airflow or pendulum
-            assert 'execution_date' in context
-            assert 'next_execution_date' in context
-            assert 'prev_execution_date' in context
-            assert 'prev_execution_date_success' in context
-            assert 'prev_start_date_success' in context
+        def f(
+            # basic
+            ds_nodash,
+            inlets,
+            next_ds,
+            next_ds_nodash,
+            outlets,
+            params,
+            prev_ds,
+            prev_ds_nodash,
+            run_id,
+            task_instance_key_str,
+            test_mode,
+            tomorrow_ds,
+            tomorrow_ds_nodash,
+            ts,
+            ts_nodash,
+            ts_nodash_with_tz,
+            yesterday_ds,
+            yesterday_ds_nodash,
+            # pendulum-specific
+            execution_date,
+            next_execution_date,
+            prev_execution_date,
+            prev_execution_date_success,
+            prev_start_date_success,
+            # airflow-specific
+            macros,
+            conf,
+            dag,
+            dag_run,
+            task,
+            # other
+            **context
+        ):  # pylint: disable=unused-argument,too-many-arguments,too-many-locals
+            pass

Review comment:
       Also if there isn't anymore I could do I would love to get this merged. :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org