You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/06/23 08:01:23 UTC

[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444036138



##########
File path: airflow/operators/python.py
##########
@@ -269,140 +281,132 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the environment ' +
-                                   'either via system_site_packages or requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or (self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            self._write_script(script_filename)
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            if self.op_kwargs:
+                # some items from context can't be loaded in virtual env
+                self._keep_serializable_op_kwargs()
+            print(self.op_kwargs)
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
+
+    def _keep_serializable_op_kwargs(self):
+        # Remove unserializable objects
+        # otherwise "KeyError: 'Variable __getstate__ does not exist'" would be raised.
+        self.op_kwargs.pop('var', None)
+        # otherwise "TypeError: cannot serialize '_io.FileIO' object" would be raised.
+        self.op_kwargs.pop('task_instance', None)
+        self.op_kwargs.pop('ti', None)
+
+        if self.system_site_packages or 'apache-airflow' in self.requirements:
+            # All can be serialized expecting it to run in an airflow env.
+            return
+
+        # Not access to host packages and no apache-airflow installed.
+        # Remove airflow specific context
+        # otherwise "ModuleNotFoundError: No module named 'airflow'" would be raised.
+        self.op_kwargs.pop('macros', None)
+        self.op_kwargs.pop('conf', None)
+        self.op_kwargs.pop('dag', None)
+        self.op_kwargs.pop('dag_run', None)
+        self.op_kwargs.pop('task', None)
+
+        if 'pendulum' in self.requirements and 'lazy_object_proxy' in self.requirements:
+            # ..but pendulum is installed so keep pendulum date objects
+            # Note: 'lazy_object_proxy' is needed to work.
+            return
+
+        # No pendulum is installed either. So remove pendulum specific context.
+        # otherwise "ModuleNotFoundError: No module named 'pendulum'" would be raised.
+        self.op_kwargs.pop('execution_date', None)
+        self.op_kwargs.pop('next_execution_date', None)
+        self.op_kwargs.pop('prev_execution_date', None)
+        self.op_kwargs.pop('prev_execution_date_success', None)
+        self.op_kwargs.pop('prev_start_date_success', None)
 
     def _write_string_args(self, filename):
-        # writes string_args to a file, which are read line by line
         with open(filename, 'w') as file:
             file.write('\n'.join(map(str, self.string_args)))
 
-    def _write_args(self, input_filename):
-        # serialize args to file
-        if self._pass_op_args():
-            with open(input_filename, 'wb') as file:
-                arg_dict = ({'args': self.op_args, 'kwargs': self.op_kwargs})
-                if self.use_dill:
-                    dill.dump(arg_dict, file)
-                else:
-                    pickle.dump(arg_dict, file)
-
-    def _read_result(self, output_filename):
-        if os.stat(output_filename).st_size == 0:
+    def _code__imports(self):
+        return f'import {self.pickling_library.__name__}\n' \
+               'import sys\n'
+
+    def _code__read_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            return f'with open({filename}, "rb") as file:\n' \
+                   f'    arg_dict = {self.pickling_library.__name__}.load(file)\n'
+        return 'arg_dict = {"args": [], "kwargs": {}}\n'
+
+    def _code__read_string_args(self, filename):
+        return f'with open({filename}, "r") as file:\n' \
+               '    virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))\n'
+
+    def _code__write_output(self, filename):
+        return f'with open({filename}, "wb") as file:\n' \
+               f'    if res: {self.pickling_library.__name__}.dump(res, file)\n'
+
+    def _code__call_script(self, arg_dict):
+        return f'{dedent(inspect.getsource(self.python_callable))}\n' \
+               f'res = {self.python_callable.__name__}(*{arg_dict}["args"], **{arg_dict}["kwargs"])\n'
+
+    def _write_script(self, filename):
+        with open(filename, 'w') as file:
+            python_code = f"{self._code__imports()}" \
+                          f"{self._code__read_args(filename='sys.argv[1]')}" \
+                          f"{self._code__read_string_args(filename='sys.argv[3]')}" \
+                          f"{self._code__call_script(arg_dict='arg_dict')}" \
+                          f"{self._code__write_output(filename='sys.argv[2]')}"
+            self.log.debug('Writing code to file\n %s', python_code)
+            file.write(python_code)

Review comment:
       Looks good 👍 How do you use it then?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org