You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/14 07:33:06 UTC

[GitHub] [airflow] casra-developers commented on a change in pull request #16110: Added windows extensions

casra-developers commented on a change in pull request #16110:
URL: https://github.com/apache/airflow/pull/16110#discussion_r649694640



##########
File path: airflow/utils/process_utils.py
##########
@@ -79,7 +86,7 @@ def signal_procs(sig):
             else:
                 raise
 
-    if pgid == os.getpgid(0):
+    if IS_WINDOWS and pgid == os.getpid() or not IS_WINDOWS and pgid == os.getpgid(0):

Review comment:
       Parentheses would be redundant as the "and" operation precedes "or" according to Boolean Algebra. (would be like a*b+c*d -> (a*b)+(c*d))

##########
File path: airflow/utils/process_utils.py
##########
@@ -155,11 +162,13 @@ def execute_interactive(cmd: List[str], **kwargs):
     """
     log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
 
-    old_tty = termios.tcgetattr(sys.stdin)
-    tty.setraw(sys.stdin.fileno())
+    if not IS_WINDOWS:
+        old_tty = termios.tcgetattr(sys.stdin)
+        tty.setraw(sys.stdin.fileno())
+
+        # open pseudo-terminal to interact with subprocess
+        master_fd, slave_fd = pty.openpty()

Review comment:
       True, this is why the patch only works if this is being used as a Dask-Worker. I can add a check and raise a NotImplemented exception if the function is used on Windows. Would this be enough for the time being or can you tell me how to implement the Windows counterpart?

##########
File path: airflow/utils/timeout.py
##########
@@ -31,20 +34,34 @@ def __init__(self, seconds=1, error_message='Timeout'):
         self.seconds = seconds
         self.error_message = error_message + ', PID: ' + str(os.getpid())
 
-    def handle_timeout(self, signum, frame):  # pylint: disable=unused-argument
+    def handle_timeout(self, *args):  # pylint: disable=unused-argument
         """Logs information and raises AirflowTaskTimeout."""
         self.log.error("Process timed out, PID: %s", str(os.getpid()))
         raise AirflowTaskTimeout(self.error_message)
 
     def __enter__(self):
         try:
-            signal.signal(signal.SIGALRM, self.handle_timeout)
-            signal.setitimer(signal.ITIMER_REAL, self.seconds)
+            if IS_WINDOWS:
+                if hasattr(self, TIMER_THREAD_ATTR) and getattr(self, TIMER_THREAD_ATTR) is not None:
+                    getattr(self, TIMER_THREAD_ATTR).cancel()
+                timer = Timer(self.seconds, self.handle_timeout)
+                setattr(self, TIMER_THREAD_ATTR, timer)
+                timer.start()

Review comment:
       I really like this approach, much cleaner than the current one. :)

##########
File path: setup.cfg
##########
@@ -156,6 +156,9 @@ install_requires =
     typing-extensions>=3.7.4;python_version<"3.8"
     unicodecsv>=0.14.1
     werkzeug~=1.0, >=1.0.1
+    # needed for generating virtual python environments when running tasks
+    virtualenv>=20.4.3
+    psycopg2>=2.8.6

Review comment:
       When testing we had an exception due to the module missing. Maybe there is a dependency that only needs it on Windows. Should I remove it and just manually install it if needed or is there an option to only install it on Windows?

##########
File path: airflow/utils/timeout.py
##########
@@ -22,12 +22,43 @@
 from airflow.utils.platform import IS_WINDOWS
 from airflow.exceptions import AirflowTaskTimeout
 from airflow.utils.log.logging_mixin import LoggingMixin
+from typing import ContextManager, Type
 
-TIMER_THREAD_ATTR = '_timer_thread'
+_timeout = ContextManager[None]
 
 
-class timeout(LoggingMixin):  # pylint: disable=invalid-name
-    """To be used in a ``with`` block and timeout its content."""
+class _timeout_windows(_timeout, LoggingMixin):
+
+    def __init__(self, seconds=1, error_message='Timeout'):
+        super().__init__()
+        self._timer: Timer = None

Review comment:
       Can you elaborate why?

##########
File path: airflow/utils/timeout.py
##########
@@ -22,12 +22,43 @@
 from airflow.utils.platform import IS_WINDOWS
 from airflow.exceptions import AirflowTaskTimeout
 from airflow.utils.log.logging_mixin import LoggingMixin
+from typing import ContextManager, Type
 
-TIMER_THREAD_ATTR = '_timer_thread'
+_timeout = ContextManager[None]
 
 
-class timeout(LoggingMixin):  # pylint: disable=invalid-name
-    """To be used in a ``with`` block and timeout its content."""
+class _timeout_windows(_timeout, LoggingMixin):
+
+    def __init__(self, seconds=1, error_message='Timeout'):
+        super().__init__()
+        self._timer: Timer = None
+        self.seconds = seconds
+        self.error_message = error_message + ': Operation timed out.'

Review comment:
       I omitted it since we are using threading tools here but it can be added again since there should still be the PID of the starting process.

##########
File path: airflow/utils/timeout.py
##########
@@ -22,12 +22,43 @@
 from airflow.utils.platform import IS_WINDOWS
 from airflow.exceptions import AirflowTaskTimeout
 from airflow.utils.log.logging_mixin import LoggingMixin
+from typing import ContextManager, Type
 
-TIMER_THREAD_ATTR = '_timer_thread'
+_timeout = ContextManager[None]
 
 
-class timeout(LoggingMixin):  # pylint: disable=invalid-name
-    """To be used in a ``with`` block and timeout its content."""
+class _timeout_windows(_timeout, LoggingMixin):
+
+    def __init__(self, seconds=1, error_message='Timeout'):
+        super().__init__()
+        self._timer: Timer = None
+        self.seconds = seconds
+        self.error_message = error_message + ': Operation timed out.'
+
+    def handle_timeout(self, *args):  # pylint: disable=unused-argument
+        """Logs information and raises AirflowTaskTimeout."""
+        self.log.error("Operation timed out.")
+        raise AirflowTaskTimeout(self.error_message)
+
+    def __enter__(self):
+        try:
+            if self._timer:
+                self._timer.cancel()
+            self._timer = Timer(self.seconds, self.handle_timeout)
+            self._timer.start()
+        except ValueError:

Review comment:
       Good point. Removed it in __enter__ and __exit__




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org