You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/14 14:41:01 UTC

[GitHub] [airflow] ashb commented on a change in pull request #10617: ensure a zipped DAG can locate the modules it provides

ashb commented on a change in pull request #10617:
URL: https://github.com/apache/airflow/pull/10617#discussion_r504734126



##########
File path: airflow/models/dagbag.py
##########
@@ -299,48 +300,120 @@ def _load_modules_from_file(self, filepath, safe_mode):
 
     def _load_modules_from_zip(self, filepath, safe_mode):
         mods = []
+
+        mod_overrides = []
         current_zip_file = zipfile.ZipFile(filepath)
+
+        # we do a first pass within the zip, in order to know which module names are provided by the zip itself at the
+        # root. Any such module name gets removed from the system path in order to ensure a successful override
+        # by the packaged zip, should the user want it.
         for zip_info in current_zip_file.infolist():
             head, _ = os.path.split(zip_info.filename)
             mod_name, ext = os.path.splitext(zip_info.filename)
             if ext not in [".py", ".pyc"]:
                 continue
             if head:
                 continue
+            mod_overrides.append(mod_name.replace('/','.').replace('\\', '.'))
 
-            if mod_name == '__init__':
-                self.log.warning("Found __init__.%s at root of %s", ext, filepath)
-
-            self.log.debug("Reading %s from %s", zip_info.filename, filepath)
+        modules_initial = set(sys.modules.keys())
 
-            if not might_contain_dag(zip_info.filename, safe_mode, current_zip_file):
-                # todo: create ignore list
-                # Don't want to spam user with skip messages
-                if not self.has_logged or True:
-                    self.has_logged = True
-                    self.log.info(
-                        "File %s:%s assumed to contain no DAGs. Skipping.",
-                        filepath, zip_info.filename
-                    )
+        for mod_key in modules_initial:
+            if mod_key in mod_overrides:
+                self.log.info("dropping module %s due to presence in mod_overrides" % mod_key)
+                del sys.modules[mod_key]
                 continue
+            if not self._is_system_module(mod_key):
+                self.log.info("dropping module %s due to _is_system_module=False" % mod_key)
+                del sys.modules[mod_key]
 
-            if mod_name in sys.modules:
-                del sys.modules[mod_name]
+        modules_before = set(sys.modules.keys())
+        path_before = list(sys.path)
 
-            try:
-                sys.path.insert(0, filepath)
-                current_module = importlib.import_module(mod_name)
-                mods.append(current_module)
-            except Exception as e:  # pylint: disable=broad-except
-                self.log.exception("Failed to import: %s", filepath)
-                if self.dagbag_import_error_tracebacks:
-                    self.import_errors[filepath] = traceback.format_exc(
-                        limit=-self.dagbag_import_error_traceback_depth
-                    )
-                else:
-                    self.import_errors[filepath] = str(e)
+        try:
+            for zip_info in current_zip_file.infolist():
+                head, _ = os.path.split(zip_info.filename)
+                mod_name, ext = os.path.splitext(zip_info.filename)
+                if ext not in [".py", ".pyc"]:
+                    continue
+                if head:
+                    continue
+
+                if mod_name == '__init__':
+                    self.log.warning("Found __init__.%s at root of %s", ext, filepath)
+
+                self.log.debug("Reading %s from %s", zip_info.filename, filepath)
+
+                if not might_contain_dag(zip_info.filename, safe_mode, current_zip_file):
+                    # todo: create ignore list
+                    # Don't want to spam user with skip messages
+                    if not self.has_logged or True:
+                        self.has_logged = True
+                        self.log.info(
+                            "File %s:%s assumed to contain no DAGs. Skipping.",
+                            filepath, zip_info.filename
+                        )
+                    continue
+
+                if mod_name in sys.modules:
+                    del sys.modules[mod_name]
+
+                try:
+                    if not filepath in sys.path:
+                        sys.path.insert(0, filepath)
+                    current_module = importlib.import_module(mod_name)
+                    mods.append(current_module)
+                except Exception as e:  # pylint: disable=broad-except
+                    self.log.exception("Failed to import: %s", filepath)
+                    if self.dagbag_import_error_tracebacks:
+                        self.import_errors[filepath] = traceback.format_exc(
+                            limit=-self.dagbag_import_error_traceback_depth
+                        )
+                    else:
+                        self.import_errors[filepath] = str(e)
+        finally:
+            loaded_modules = set(sys.modules.keys()) - modules_before
+            for mod_key in loaded_modules:
+                del sys.modules[mod_key]
+
+            sys.path = path_before
         return mods
 
+    def _is_system_module(self, module_name):
+        """
+        :return: True if the module comes from the platform (and ought to stay loaded) or might come from
+        the actions of a DAG (in which case it's safer to unload it before reloading it)
+        """
+        if module_name in ["sys", "types", "__main__", "websocket", "airflow"]:

Review comment:
       You're going to have to explain this in a lot more detail. -- there are many many more system modules than this :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org