You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/03/31 13:41:40 UTC

[GitHub] [airflow] ashb opened a new pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

ashb opened a new pull request #15112:
URL: https://github.com/apache/airflow/pull/15112


   Closes #7935, #15037 (I hope!) 
   
    There have been long standing issues where the scheduler would "stop 
    responding" that we haven't been able to track down.
   
    Someone was able to catch the scheduler in this state in 2.0.1 and inspect
    it with py-spy (thanks, MatthewRBruce!)
   
    The stack traces (slightly shortened) were:
   
    ``` Process 6: /usr/local/bin/python /usr/local/bin/airflow scheduler 
    Python v3.8.7 (/usr/local/bin/python3.8) Thread 0x7FF5C09C8740 (active):
    "MainThread"
       _send (multiprocessing/connection.py:368)
       _send_bytes (multiprocessing/connection.py:411)
       send (multiprocessing/connection.py:206)
       send_callback_to_execute (airflow/utils/dag_processing.py:283)
       _send_dag_callbacks_to_processor (airflow/jobs/scheduler_job.py:1795)
       _schedule_dag_run (airflow/jobs/scheduler_job.py:1762)
   
    Process 77: airflow scheduler -- DagFileProcessorManager Python v3.8.7
    (/usr/local/bin/python3.8) Thread 0x7FF5C09C8740 (active): "MainThread"
       _send (multiprocessing/connection.py:368)
       _send_bytes (multiprocessing/connection.py:405)
       send (multiprocessing/connection.py:206)
       _run_parsing_loop (airflow/utils/dag_processing.py:698)
       start (airflow/utils/dag_processing.py:596)
    ```
   
    What this shows is that both processes are stuck trying to send data to 
    each other, but neither can proceed as both buffers are full, but since 
    both are trying to send, neither side is going to read and make more space
    in the buffer. A classic deadlock!
   
    The fix for this is two fold:
   
    1) Enable non-blocking IO on the DagFileProcessorManager side.
   
       The only thing the Manager sends back up the pipe is (now, as of 2.0)
      the DagParsingStat object, and the scheduler will happily continue
      without receiving these, so in the case of a blocking error, it is
      simply better to ignore the error, continue the loop and try sending
      one again later.
   
    2) Reduce the size of DagParsingStat
   
       In the case of a large number of dag files we included the path for
      each and every one (in full) in _each_ parsing stat. Not only did the
      scheduler do nothing with this field, meaning it was larger than it
      needed to be, by making it such a large object, it increases the
      likely hood of hitting this send-buffer-full deadlock case!
   
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605076144



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -521,17 +523,93 @@ def test_dag_with_system_exit(self):
 
         manager._run_parsing_loop()
 
+        result = None
         while parent_pipe.poll(timeout=None):
             result = parent_pipe.recv()
             if isinstance(result, DagParsingStat) and result.done:
                 break
 
         # Three files in folder should be processed
-        assert len(result.file_paths) == 3
+        assert result.num_file_paths == 3
 
         with create_session() as session:
             assert session.query(DagModel).get(dag_id) is not None
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @pytest.mark.backend("mysql", "postgres")
+    def test_pipe_full_deadlock(self):
+        dag_filepath = TEST_DAG_FOLDER / "test_scheduler_dags.py"
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        # Shrink the buffers to exacerbate the problem!
+        for fd in (parent_pipe.fileno(),):
+            sock = socket.socket(fileno=fd)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
+            sock.detach()
+
+        exit_event = threading.Event()
+
+        # To test this behaviour we need something that continually fills the
+        # parent pipe's bufffer (and keeps it full).
+        def keep_pipe_full(pipe, exit_event):
+            import logging
+
+            n = 0
+            while True:
+                if exit_event.is_set():
+                    break
+
+                req = CallbackRequest(str(dag_filepath))
+                try:
+                    logging.debug("Sending CallbackRequests %d", n + 1)
+                    pipe.send(req)
+                except TypeError:
+                    # This is actually the error you get when the parent pipe
+                    # is closed! Nicely handled, eh?
+                    break
+                except OSError:
+                    break
+                n += 1
+                logging.debug("   Sent %d CallbackRequests", n)
+
+        thread = threading.Thread(target=keep_pipe_full, args=(parent_pipe, exit_event))
+
+        fake_processors = []
+
+        def fake_processor_factory(*args, **kwargs):
+            nonlocal fake_processors
+            processor = FakeDagFileProcessorRunner._fake_dag_processor_factory(*args, **kwargs)
+            fake_processors.append(processor)
+            return processor
+
+        manager = DagFileProcessorManager(
+            dag_directory=dag_filepath,
+            dag_ids=[],
+            # A reasonable large number to ensure that we trigger the deadlock
+            max_runs=100,
+            processor_factory=fake_processor_factory,
+            processor_timeout=timedelta(seconds=5),
+            signal_conn=child_pipe,
+            pickle_dags=False,
+            async_mode=True,
+        )
+
+        try:
+            thread.start()
+
+            # If this completes without hanging, then the test is good!
+            manager._run_parsing_loop()

Review comment:
       Oh in tests, yes we probably should. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#issuecomment-811278484


   [The Workflow run](https://github.com/apache/airflow/actions/runs/705808662) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605040314



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -521,17 +521,97 @@ def test_dag_with_system_exit(self):
 
         manager._run_parsing_loop()
 
+        result = None
         while parent_pipe.poll(timeout=None):
             result = parent_pipe.recv()
             if isinstance(result, DagParsingStat) and result.done:
                 break
 
         # Three files in folder should be processed
-        assert len(result.file_paths) == 3
+        assert result.num_file_paths == 3
 
         with create_session() as session:
             assert session.query(DagModel).get(dag_id) is not None
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @pytest.mark.backend("mysql", "postgres")
+    def test_pipe_full_deadlock(self):
+        import threading
+
+        dag_filepath = TEST_DAG_FOLDER / "test_scheduler_dags.py"
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        import socket

Review comment:
       https://github.com/apache/airflow/pull/15112/commits/c70f6460d0bacc6798e890c88735db4cfef67285




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605184402



##########
File path: airflow/utils/dag_processing.py
##########
@@ -141,7 +141,7 @@ def waitable_handle(self):
 class DagParsingStat(NamedTuple):
     """Information on processing progress"""
 
-    file_paths: List[str]
+    num_file_paths: int

Review comment:
       Checking `self._sync_metadata` also leads me to something else:
   
   https://github.com/apache/airflow/blob/1bec3b21266764f367431ab5d9a5b75f52b9b6d2/airflow/utils/dag_processing.py#L323-L326 is doing exactly the same thing in `self._process_message()`, which has been invoked just before L323-326. 
   
   A clean-up refactor to do? Or I misunderstood something?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605216893



##########
File path: airflow/utils/dag_processing.py
##########
@@ -141,7 +141,7 @@ def waitable_handle(self):
 class DagParsingStat(NamedTuple):
     """Information on processing progress"""
 
-    file_paths: List[str]
+    num_file_paths: int

Review comment:
       lol yeah, good catch @XD-DENG `self. _sync_metadata` is redundant here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605081881



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -521,17 +523,93 @@ def test_dag_with_system_exit(self):
 
         manager._run_parsing_loop()
 
+        result = None
         while parent_pipe.poll(timeout=None):
             result = parent_pipe.recv()
             if isinstance(result, DagParsingStat) and result.done:
                 break
 
         # Three files in folder should be processed
-        assert len(result.file_paths) == 3
+        assert result.num_file_paths == 3
 
         with create_session() as session:
             assert session.query(DagModel).get(dag_id) is not None
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @pytest.mark.backend("mysql", "postgres")
+    def test_pipe_full_deadlock(self):
+        dag_filepath = TEST_DAG_FOLDER / "test_scheduler_dags.py"
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        # Shrink the buffers to exacerbate the problem!
+        for fd in (parent_pipe.fileno(),):
+            sock = socket.socket(fileno=fd)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
+            sock.detach()
+
+        exit_event = threading.Event()
+
+        # To test this behaviour we need something that continually fills the
+        # parent pipe's bufffer (and keeps it full).
+        def keep_pipe_full(pipe, exit_event):
+            import logging

Review comment:
       https://github.com/apache/airflow/pull/15112/commits/5e7405f5574b36e8bc9cf8df0d0d788ba7238ad4




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605039463



##########
File path: airflow/utils/dag_processing.py
##########
@@ -696,11 +706,20 @@ def _run_parsing_loop(self):
             max_runs_reached = self.max_runs_reached()
 
             dag_parsing_stat = DagParsingStat(
-                self._file_paths,
+                len(self._file_paths),
                 max_runs_reached,
                 all_files_processed,
             )
-            self._signal_conn.send(dag_parsing_stat)
+            try:
+                self._signal_conn.send(dag_parsing_stat)

Review comment:
       Since these are local sockets, with the change to Non-blocking mode, it will either succeed, or fail entirely, "instantly", so we don't need a timeout, no.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605047562



##########
File path: airflow/utils/dag_processing.py
##########
@@ -696,11 +706,20 @@ def _run_parsing_loop(self):
             max_runs_reached = self.max_runs_reached()
 
             dag_parsing_stat = DagParsingStat(
-                self._file_paths,
+                len(self._file_paths),
                 max_runs_reached,
                 all_files_processed,
             )
-            self._signal_conn.send(dag_parsing_stat)
+            try:
+                self._signal_conn.send(dag_parsing_stat)

Review comment:
       On Linux these are AF_UNIX sockets too, so even harder to get it to not get instant fail/success in non-blocking mode I think 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605040858



##########
File path: airflow/utils/dag_processing.py
##########
@@ -696,11 +706,20 @@ def _run_parsing_loop(self):
             max_runs_reached = self.max_runs_reached()
 
             dag_parsing_stat = DagParsingStat(
-                self._file_paths,
+                len(self._file_paths),
                 max_runs_reached,
                 all_files_processed,
             )
-            self._signal_conn.send(dag_parsing_stat)
+            try:
+                self._signal_conn.send(dag_parsing_stat)
+            except BlockingIOError:

Review comment:
       I don't think so, no. Anything else is an "error" condition, and we want to die (and the scheduler will notice and restart the manager process)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605027200



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -521,17 +521,97 @@ def test_dag_with_system_exit(self):
 
         manager._run_parsing_loop()
 
+        result = None
         while parent_pipe.poll(timeout=None):
             result = parent_pipe.recv()
             if isinstance(result, DagParsingStat) and result.done:
                 break
 
         # Three files in folder should be processed
-        assert len(result.file_paths) == 3
+        assert result.num_file_paths == 3
 
         with create_session() as session:
             assert session.query(DagModel).get(dag_id) is not None
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @pytest.mark.backend("mysql", "postgres")
+    def test_pipe_full_deadlock(self):
+        import threading
+
+        dag_filepath = TEST_DAG_FOLDER / "test_scheduler_dags.py"
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        import socket

Review comment:
       Yes, lets move it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605056899



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -521,17 +523,93 @@ def test_dag_with_system_exit(self):
 
         manager._run_parsing_loop()
 
+        result = None
         while parent_pipe.poll(timeout=None):
             result = parent_pipe.recv()
             if isinstance(result, DagParsingStat) and result.done:
                 break
 
         # Three files in folder should be processed
-        assert len(result.file_paths) == 3
+        assert result.num_file_paths == 3
 
         with create_session() as session:
             assert session.query(DagModel).get(dag_id) is not None
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @pytest.mark.backend("mysql", "postgres")
+    def test_pipe_full_deadlock(self):
+        dag_filepath = TEST_DAG_FOLDER / "test_scheduler_dags.py"
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        # Shrink the buffers to exacerbate the problem!
+        for fd in (parent_pipe.fileno(),):
+            sock = socket.socket(fileno=fd)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
+            sock.detach()
+
+        exit_event = threading.Event()
+
+        # To test this behaviour we need something that continually fills the
+        # parent pipe's bufffer (and keeps it full).
+        def keep_pipe_full(pipe, exit_event):
+            import logging

Review comment:
       and this too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605215817



##########
File path: airflow/utils/dag_processing.py
##########
@@ -141,7 +141,7 @@ def waitable_handle(self):
 class DagParsingStat(NamedTuple):
     """Information on processing progress"""
 
-    file_paths: List[str]
+    num_file_paths: int

Review comment:
       Yeah, that looks like a bug. PR comin.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605198494



##########
File path: airflow/utils/dag_processing.py
##########
@@ -141,7 +141,7 @@ def waitable_handle(self):
 class DagParsingStat(NamedTuple):
     """Information on processing progress"""
 
-    file_paths: List[str]
+    num_file_paths: int

Review comment:
       Yes, done, was only used in one test, and that was strictly speaking not testing the right thing anymore, just a likely side-effect.
   
   ```diff
   -        assert result.num_file_paths == 3
   +        assert sum(stat.run_count for stat in manager._file_stats.values()) == 3
   ```
   
   See 4d8da57eb for full diff.
   
   Good call @XD-DENG 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605210067



##########
File path: airflow/utils/dag_processing.py
##########
@@ -141,7 +141,7 @@ def waitable_handle(self):
 class DagParsingStat(NamedTuple):
     """Information on processing progress"""
 
-    file_paths: List[str]
+    num_file_paths: int

Review comment:
       > Checking `self._sync_metadata` also leads me to something else:
   > 
   > https://github.com/apache/airflow/blob/1bec3b21266764f367431ab5d9a5b75f52b9b6d2/airflow/utils/dag_processing.py#L323-L326
   > 
   > is doing exactly the same thing in `self._process_message()`, which has been invoked just before L323-326.
   > A clean-up refactor to do? Or I misunderstood something?
   
   Thanks @ashb . How about this question?
   
   We can address it separately of course, but want to confirm if I overlooked anything regarding this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605029331



##########
File path: airflow/utils/dag_processing.py
##########
@@ -696,11 +706,20 @@ def _run_parsing_loop(self):
             max_runs_reached = self.max_runs_reached()
 
             dag_parsing_stat = DagParsingStat(
-                self._file_paths,
+                len(self._file_paths),
                 max_runs_reached,
                 all_files_processed,
             )
-            self._signal_conn.send(dag_parsing_stat)
+            try:
+                self._signal_conn.send(dag_parsing_stat)

Review comment:
       I'm not sure: does applying a timeout context here makes sense?
   The reason is it may try to send and get stuck but not fail explicitly. So to make it fail after specific timeout time may help.
   
   May be a dumb question.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605163605



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -521,17 +524,90 @@ def test_dag_with_system_exit(self):
 
         manager._run_parsing_loop()
 
+        result = None
         while parent_pipe.poll(timeout=None):
             result = parent_pipe.recv()
             if isinstance(result, DagParsingStat) and result.done:
                 break
 
         # Three files in folder should be processed
-        assert len(result.file_paths) == 3
+        assert result.num_file_paths == 3
 
         with create_session() as session:
             assert session.query(DagModel).get(dag_id) is not None
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @pytest.mark.backend("mysql", "postgres")
+    @pytest.mark.timeout(60)
+    def test_pipe_full_deadlock(self):
+        dag_filepath = TEST_DAG_FOLDER / "test_scheduler_dags.py"
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        # Shrink the buffers to exacerbate the problem!
+        for fd in (parent_pipe.fileno(),):
+            sock = socket.socket(fileno=fd)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
+            sock.detach()
+
+        exit_event = threading.Event()
+
+        # To test this behaviour we need something that continually fills the
+        # parent pipe's bufffer (and keeps it full).

Review comment:
       ```suggestion
           # parent pipe's buffer (and keeps it full).
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605056757



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -521,17 +523,93 @@ def test_dag_with_system_exit(self):
 
         manager._run_parsing_loop()
 
+        result = None
         while parent_pipe.poll(timeout=None):
             result = parent_pipe.recv()
             if isinstance(result, DagParsingStat) and result.done:
                 break
 
         # Three files in folder should be processed
-        assert len(result.file_paths) == 3
+        assert result.num_file_paths == 3
 
         with create_session() as session:
             assert session.query(DagModel).get(dag_id) is not None
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @pytest.mark.backend("mysql", "postgres")
+    def test_pipe_full_deadlock(self):
+        dag_filepath = TEST_DAG_FOLDER / "test_scheduler_dags.py"
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        # Shrink the buffers to exacerbate the problem!
+        for fd in (parent_pipe.fileno(),):
+            sock = socket.socket(fileno=fd)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
+            sock.detach()
+
+        exit_event = threading.Event()
+
+        # To test this behaviour we need something that continually fills the
+        # parent pipe's bufffer (and keeps it full).
+        def keep_pipe_full(pipe, exit_event):
+            import logging
+
+            n = 0
+            while True:
+                if exit_event.is_set():
+                    break
+
+                req = CallbackRequest(str(dag_filepath))
+                try:
+                    logging.debug("Sending CallbackRequests %d", n + 1)
+                    pipe.send(req)
+                except TypeError:
+                    # This is actually the error you get when the parent pipe
+                    # is closed! Nicely handled, eh?
+                    break
+                except OSError:
+                    break
+                n += 1
+                logging.debug("   Sent %d CallbackRequests", n)
+
+        thread = threading.Thread(target=keep_pipe_full, args=(parent_pipe, exit_event))
+
+        fake_processors = []
+
+        def fake_processor_factory(*args, **kwargs):
+            nonlocal fake_processors
+            processor = FakeDagFileProcessorRunner._fake_dag_processor_factory(*args, **kwargs)
+            fake_processors.append(processor)
+            return processor
+
+        manager = DagFileProcessorManager(
+            dag_directory=dag_filepath,
+            dag_ids=[],
+            # A reasonable large number to ensure that we trigger the deadlock
+            max_runs=100,
+            processor_factory=fake_processor_factory,
+            processor_timeout=timedelta(seconds=5),
+            signal_conn=child_pipe,
+            pickle_dags=False,
+            async_mode=True,
+        )
+
+        try:
+            thread.start()
+
+            # If this completes without hanging, then the test is good!
+            manager._run_parsing_loop()
+            exit_event.set()
+        finally:
+            import logging

Review comment:
       This can go at the top too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605029756



##########
File path: airflow/utils/dag_processing.py
##########
@@ -696,11 +706,20 @@ def _run_parsing_loop(self):
             max_runs_reached = self.max_runs_reached()
 
             dag_parsing_stat = DagParsingStat(
-                self._file_paths,
+                len(self._file_paths),
                 max_runs_reached,
                 all_files_processed,
             )
-            self._signal_conn.send(dag_parsing_stat)
+            try:
+                self._signal_conn.send(dag_parsing_stat)
+            except BlockingIOError:

Review comment:
       Any possibility of other error type? Again, may be dumb question.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605029756



##########
File path: airflow/utils/dag_processing.py
##########
@@ -696,11 +706,20 @@ def _run_parsing_loop(self):
             max_runs_reached = self.max_runs_reached()
 
             dag_parsing_stat = DagParsingStat(
-                self._file_paths,
+                len(self._file_paths),
                 max_runs_reached,
                 all_files_processed,
             )
-            self._signal_conn.send(dag_parsing_stat)
+            try:
+                self._signal_conn.send(dag_parsing_stat)
+            except BlockingIOError:

Review comment:
       Any possibility of other error type that we may want to ignore? Again, may be a dumb question.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605026425



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -521,17 +521,97 @@ def test_dag_with_system_exit(self):
 
         manager._run_parsing_loop()
 
+        result = None
         while parent_pipe.poll(timeout=None):
             result = parent_pipe.recv()
             if isinstance(result, DagParsingStat) and result.done:
                 break
 
         # Three files in folder should be processed
-        assert len(result.file_paths) == 3
+        assert result.num_file_paths == 3
 
         with create_session() as session:
             assert session.query(DagModel).get(dag_id) is not None
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @pytest.mark.backend("mysql", "postgres")
+    def test_pipe_full_deadlock(self):
+        import threading
+
+        dag_filepath = TEST_DAG_FOLDER / "test_scheduler_dags.py"
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        import socket

Review comment:
       Should we move the imports at the top of the file or are they intentionally here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#issuecomment-811077715


   /cc @uranusjr 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605029331



##########
File path: airflow/utils/dag_processing.py
##########
@@ -696,11 +706,20 @@ def _run_parsing_loop(self):
             max_runs_reached = self.max_runs_reached()
 
             dag_parsing_stat = DagParsingStat(
-                self._file_paths,
+                len(self._file_paths),
                 max_runs_reached,
                 all_files_processed,
             )
-            self._signal_conn.send(dag_parsing_stat)
+            try:
+                self._signal_conn.send(dag_parsing_stat)

Review comment:
       I'm not sure: does applying a timeout context here makes sense?
   The reason is it may try to send and get stuck but not fail explicitly. So to make it fail after specific timeout time may help.
   
   May be a dumb question.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605077882



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -20,7 +20,9 @@
 import os

Review comment:
       ```suggestion
   import logging
   import os
   ```
   

##########
File path: tests/utils/test_dag_processing.py
##########
@@ -521,17 +523,93 @@ def test_dag_with_system_exit(self):
 
         manager._run_parsing_loop()
 
+        result = None
         while parent_pipe.poll(timeout=None):
             result = parent_pipe.recv()
             if isinstance(result, DagParsingStat) and result.done:
                 break
 
         # Three files in folder should be processed
-        assert len(result.file_paths) == 3
+        assert result.num_file_paths == 3
 
         with create_session() as session:
             assert session.query(DagModel).get(dag_id) is not None
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @pytest.mark.backend("mysql", "postgres")
+    def test_pipe_full_deadlock(self):
+        dag_filepath = TEST_DAG_FOLDER / "test_scheduler_dags.py"
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        # Shrink the buffers to exacerbate the problem!
+        for fd in (parent_pipe.fileno(),):
+            sock = socket.socket(fileno=fd)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
+            sock.detach()
+
+        exit_event = threading.Event()
+
+        # To test this behaviour we need something that continually fills the
+        # parent pipe's bufffer (and keeps it full).
+        def keep_pipe_full(pipe, exit_event):
+            import logging
+

Review comment:
       ```suggestion
   ```
   

##########
File path: tests/utils/test_dag_processing.py
##########
@@ -521,17 +523,93 @@ def test_dag_with_system_exit(self):
 
         manager._run_parsing_loop()
 
+        result = None
         while parent_pipe.poll(timeout=None):
             result = parent_pipe.recv()
             if isinstance(result, DagParsingStat) and result.done:
                 break
 
         # Three files in folder should be processed
-        assert len(result.file_paths) == 3
+        assert result.num_file_paths == 3
 
         with create_session() as session:
             assert session.query(DagModel).get(dag_id) is not None
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @pytest.mark.backend("mysql", "postgres")
+    def test_pipe_full_deadlock(self):
+        dag_filepath = TEST_DAG_FOLDER / "test_scheduler_dags.py"
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        # Shrink the buffers to exacerbate the problem!
+        for fd in (parent_pipe.fileno(),):
+            sock = socket.socket(fileno=fd)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
+            sock.detach()
+
+        exit_event = threading.Event()
+
+        # To test this behaviour we need something that continually fills the
+        # parent pipe's bufffer (and keeps it full).
+        def keep_pipe_full(pipe, exit_event):
+            import logging
+
+            n = 0
+            while True:
+                if exit_event.is_set():
+                    break
+
+                req = CallbackRequest(str(dag_filepath))
+                try:
+                    logging.debug("Sending CallbackRequests %d", n + 1)
+                    pipe.send(req)
+                except TypeError:
+                    # This is actually the error you get when the parent pipe
+                    # is closed! Nicely handled, eh?
+                    break
+                except OSError:
+                    break
+                n += 1
+                logging.debug("   Sent %d CallbackRequests", n)
+
+        thread = threading.Thread(target=keep_pipe_full, args=(parent_pipe, exit_event))
+
+        fake_processors = []
+
+        def fake_processor_factory(*args, **kwargs):
+            nonlocal fake_processors
+            processor = FakeDagFileProcessorRunner._fake_dag_processor_factory(*args, **kwargs)
+            fake_processors.append(processor)
+            return processor
+
+        manager = DagFileProcessorManager(
+            dag_directory=dag_filepath,
+            dag_ids=[],
+            # A reasonable large number to ensure that we trigger the deadlock
+            max_runs=100,
+            processor_factory=fake_processor_factory,
+            processor_timeout=timedelta(seconds=5),
+            signal_conn=child_pipe,
+            pickle_dags=False,
+            async_mode=True,
+        )
+
+        try:
+            thread.start()
+
+            # If this completes without hanging, then the test is good!
+            manager._run_parsing_loop()
+            exit_event.set()
+        finally:
+            import logging
+

Review comment:
       ```suggestion
   ```
   

##########
File path: tests/utils/test_dag_processing.py
##########
@@ -521,17 +523,93 @@ def test_dag_with_system_exit(self):
 
         manager._run_parsing_loop()
 
+        result = None
         while parent_pipe.poll(timeout=None):
             result = parent_pipe.recv()
             if isinstance(result, DagParsingStat) and result.done:
                 break
 
         # Three files in folder should be processed
-        assert len(result.file_paths) == 3
+        assert result.num_file_paths == 3
 
         with create_session() as session:
             assert session.query(DagModel).get(dag_id) is not None
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @pytest.mark.backend("mysql", "postgres")

Review comment:
       ```suggestion
       @pytest.mark.backend("mysql", "postgres")
       @pytest.mark.timeout(60)
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605095651



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -17,10 +17,13 @@
 # under the License.
 
 import multiprocessing
+import logging

Review comment:
       ```suggestion
   import logging
   import multiprocessing
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605059410



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -521,17 +523,93 @@ def test_dag_with_system_exit(self):
 
         manager._run_parsing_loop()
 
+        result = None
         while parent_pipe.poll(timeout=None):
             result = parent_pipe.recv()
             if isinstance(result, DagParsingStat) and result.done:
                 break
 
         # Three files in folder should be processed
-        assert len(result.file_paths) == 3
+        assert result.num_file_paths == 3
 
         with create_session() as session:
             assert session.query(DagModel).get(dag_id) is not None
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @pytest.mark.backend("mysql", "postgres")
+    def test_pipe_full_deadlock(self):
+        dag_filepath = TEST_DAG_FOLDER / "test_scheduler_dags.py"
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        # Shrink the buffers to exacerbate the problem!
+        for fd in (parent_pipe.fileno(),):
+            sock = socket.socket(fileno=fd)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
+            sock.detach()
+
+        exit_event = threading.Event()
+
+        # To test this behaviour we need something that continually fills the
+        # parent pipe's bufffer (and keeps it full).
+        def keep_pipe_full(pipe, exit_event):
+            import logging
+
+            n = 0
+            while True:
+                if exit_event.is_set():
+                    break
+
+                req = CallbackRequest(str(dag_filepath))
+                try:
+                    logging.debug("Sending CallbackRequests %d", n + 1)
+                    pipe.send(req)
+                except TypeError:
+                    # This is actually the error you get when the parent pipe
+                    # is closed! Nicely handled, eh?
+                    break
+                except OSError:
+                    break
+                n += 1
+                logging.debug("   Sent %d CallbackRequests", n)
+
+        thread = threading.Thread(target=keep_pipe_full, args=(parent_pipe, exit_event))
+
+        fake_processors = []
+
+        def fake_processor_factory(*args, **kwargs):
+            nonlocal fake_processors
+            processor = FakeDagFileProcessorRunner._fake_dag_processor_factory(*args, **kwargs)
+            fake_processors.append(processor)
+            return processor
+
+        manager = DagFileProcessorManager(
+            dag_directory=dag_filepath,
+            dag_ids=[],
+            # A reasonable large number to ensure that we trigger the deadlock
+            max_runs=100,
+            processor_factory=fake_processor_factory,
+            processor_timeout=timedelta(seconds=5),
+            signal_conn=child_pipe,
+            pickle_dags=False,
+            async_mode=True,
+        )
+
+        try:
+            thread.start()
+
+            # If this completes without hanging, then the test is good!
+            manager._run_parsing_loop()

Review comment:
       Should we add a timeout to this -- so that when the tests fails it does not hang the CI,
   
   instead we timeout and raise and exception where we can explain further 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605194036



##########
File path: airflow/utils/dag_processing.py
##########
@@ -141,7 +141,7 @@ def waitable_handle(self):
 class DagParsingStat(NamedTuple):
     """Information on processing progress"""
 
-    file_paths: List[str]
+    num_file_paths: int

Review comment:
       Fair point yes, this is only used for tests and could possibly be removed. Let me take a look.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605081483



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -521,17 +523,93 @@ def test_dag_with_system_exit(self):
 
         manager._run_parsing_loop()
 
+        result = None
         while parent_pipe.poll(timeout=None):
             result = parent_pipe.recv()
             if isinstance(result, DagParsingStat) and result.done:
                 break
 
         # Three files in folder should be processed
-        assert len(result.file_paths) == 3
+        assert result.num_file_paths == 3
 
         with create_session() as session:
             assert session.query(DagModel).get(dag_id) is not None
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @pytest.mark.backend("mysql", "postgres")
+    def test_pipe_full_deadlock(self):
+        dag_filepath = TEST_DAG_FOLDER / "test_scheduler_dags.py"
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        # Shrink the buffers to exacerbate the problem!
+        for fd in (parent_pipe.fileno(),):
+            sock = socket.socket(fileno=fd)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
+            sock.detach()
+
+        exit_event = threading.Event()
+
+        # To test this behaviour we need something that continually fills the
+        # parent pipe's bufffer (and keeps it full).
+        def keep_pipe_full(pipe, exit_event):
+            import logging
+
+            n = 0
+            while True:
+                if exit_event.is_set():
+                    break
+
+                req = CallbackRequest(str(dag_filepath))
+                try:
+                    logging.debug("Sending CallbackRequests %d", n + 1)
+                    pipe.send(req)
+                except TypeError:
+                    # This is actually the error you get when the parent pipe
+                    # is closed! Nicely handled, eh?
+                    break
+                except OSError:
+                    break
+                n += 1
+                logging.debug("   Sent %d CallbackRequests", n)
+
+        thread = threading.Thread(target=keep_pipe_full, args=(parent_pipe, exit_event))
+
+        fake_processors = []
+
+        def fake_processor_factory(*args, **kwargs):
+            nonlocal fake_processors
+            processor = FakeDagFileProcessorRunner._fake_dag_processor_factory(*args, **kwargs)
+            fake_processors.append(processor)
+            return processor
+
+        manager = DagFileProcessorManager(
+            dag_directory=dag_filepath,
+            dag_ids=[],
+            # A reasonable large number to ensure that we trigger the deadlock
+            max_runs=100,
+            processor_factory=fake_processor_factory,
+            processor_timeout=timedelta(seconds=5),
+            signal_conn=child_pipe,
+            pickle_dags=False,
+            async_mode=True,
+        )
+
+        try:
+            thread.start()
+
+            # If this completes without hanging, then the test is good!
+            manager._run_parsing_loop()

Review comment:
       https://github.com/apache/airflow/pull/15112/commits/5e7405f5574b36e8bc9cf8df0d0d788ba7238ad4




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605046059



##########
File path: airflow/utils/dag_processing.py
##########
@@ -696,11 +706,20 @@ def _run_parsing_loop(self):
             max_runs_reached = self.max_runs_reached()
 
             dag_parsing_stat = DagParsingStat(
-                self._file_paths,
+                len(self._file_paths),
                 max_runs_reached,
                 all_files_processed,
             )
-            self._signal_conn.send(dag_parsing_stat)
+            try:
+                self._signal_conn.send(dag_parsing_stat)

Review comment:
       It wouldn't be a bad idea to add a timeout, but that's extremely rarely an issue for local sockets (things need to get pretty wrong for that to happen at all). This can always be revisited if someone ever complains.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #15112:
URL: https://github.com/apache/airflow/pull/15112


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605181000



##########
File path: airflow/utils/dag_processing.py
##########
@@ -141,7 +141,7 @@ def waitable_handle(self):
 class DagParsingStat(NamedTuple):
     """Information on processing progress"""
 
-    file_paths: List[str]
+    num_file_paths: int

Review comment:
       let's further challenge this part: why do we need `num_file_paths` (or the earlier `file_paths`) at all?
   
   Where we really need this `DagParsingStat` is `self._sync_metadata`, in which we don't use `num_file_paths` (or the earlier `file_paths`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605082015



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -521,17 +523,93 @@ def test_dag_with_system_exit(self):
 
         manager._run_parsing_loop()
 
+        result = None
         while parent_pipe.poll(timeout=None):
             result = parent_pipe.recv()
             if isinstance(result, DagParsingStat) and result.done:
                 break
 
         # Three files in folder should be processed
-        assert len(result.file_paths) == 3
+        assert result.num_file_paths == 3
 
         with create_session() as session:
             assert session.query(DagModel).get(dag_id) is not None
 
+    @conf_vars({('core', 'load_examples'): 'False'})
+    @pytest.mark.backend("mysql", "postgres")
+    def test_pipe_full_deadlock(self):
+        dag_filepath = TEST_DAG_FOLDER / "test_scheduler_dags.py"
+
+        child_pipe, parent_pipe = multiprocessing.Pipe()
+
+        # Shrink the buffers to exacerbate the problem!
+        for fd in (parent_pipe.fileno(),):
+            sock = socket.socket(fileno=fd)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
+            sock.detach()
+
+        exit_event = threading.Event()
+
+        # To test this behaviour we need something that continually fills the
+        # parent pipe's bufffer (and keeps it full).
+        def keep_pipe_full(pipe, exit_event):
+            import logging
+
+            n = 0
+            while True:
+                if exit_event.is_set():
+                    break
+
+                req = CallbackRequest(str(dag_filepath))
+                try:
+                    logging.debug("Sending CallbackRequests %d", n + 1)
+                    pipe.send(req)
+                except TypeError:
+                    # This is actually the error you get when the parent pipe
+                    # is closed! Nicely handled, eh?
+                    break
+                except OSError:
+                    break
+                n += 1
+                logging.debug("   Sent %d CallbackRequests", n)
+
+        thread = threading.Thread(target=keep_pipe_full, args=(parent_pipe, exit_event))
+
+        fake_processors = []
+
+        def fake_processor_factory(*args, **kwargs):
+            nonlocal fake_processors
+            processor = FakeDagFileProcessorRunner._fake_dag_processor_factory(*args, **kwargs)
+            fake_processors.append(processor)
+            return processor
+
+        manager = DagFileProcessorManager(
+            dag_directory=dag_filepath,
+            dag_ids=[],
+            # A reasonable large number to ensure that we trigger the deadlock
+            max_runs=100,
+            processor_factory=fake_processor_factory,
+            processor_timeout=timedelta(seconds=5),
+            signal_conn=child_pipe,
+            pickle_dags=False,
+            async_mode=True,
+        )
+
+        try:
+            thread.start()
+
+            # If this completes without hanging, then the test is good!
+            manager._run_parsing_loop()
+            exit_event.set()
+        finally:
+            import logging

Review comment:
       https://github.com/apache/airflow/pull/15112/commits/5e7405f5574b36e8bc9cf8df0d0d788ba7238ad4




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #15112: Avoid scheduler/parser manager deadlock by using non-blocking IO

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #15112:
URL: https://github.com/apache/airflow/pull/15112#discussion_r605181000



##########
File path: airflow/utils/dag_processing.py
##########
@@ -141,7 +141,7 @@ def waitable_handle(self):
 class DagParsingStat(NamedTuple):
     """Information on processing progress"""
 
-    file_paths: List[str]
+    num_file_paths: int

Review comment:
       let's further challenge this part: why do we need `num_file_paths` or earlier `file_paths`?
   
   Where we really need this `DagParsingStat` is `self._sync_metadata`, in which we don't use `num_file_paths` or earlier `file_paths`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org