You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/12/11 15:24:53 UTC

[airflow] branch v2-2-test updated (a4c8201 -> 20ac7c1)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a change to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from a4c8201  Replaced 2.3.3 in entrypoint docs
     new 00652ec  Fix failing main. (#20094)
     new 1b958ca  Fix race condition when starting DagProcessorAgent (#19935)
     new 7dd9923  Move setgid as the first command executed in forked task runner (#20040)
     new 20ac7c1  Fix flaky on_kill (#20054)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/dag_processing/manager.py                  | 10 +++
 airflow/task/task_runner/standard_task_runner.py   | 10 +--
 airflow/utils/process_utils.py                     | 55 ++++++++++++-----
 tests/dag_processing/test_processor.py             | 50 +++++++++++++++
 tests/dags/test_on_kill.py                         | 10 ++-
 .../task/task_runner/test_standard_task_runner.py  | 71 ++++++++++++++--------
 tests/www/views/test_views_rendered.py             |  2 +-
 7 files changed, 160 insertions(+), 48 deletions(-)

[airflow] 01/04: Fix failing main. (#20094)

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 00652ec12e050cbee7bd68661ee88dac2e64ca53
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Dec 7 07:24:37 2021 +0100

    Fix failing main. (#20094)
    
    (cherry picked from commit 50b72961124db7fc83ab4695a958d009869f4d4c)
---
 tests/www/views/test_views_rendered.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/www/views/test_views_rendered.py b/tests/www/views/test_views_rendered.py
index f88db34..129baf7 100644
--- a/tests/www/views/test_views_rendered.py
+++ b/tests/www/views/test_views_rendered.py
@@ -157,7 +157,7 @@ def test_user_defined_filter_and_macros_raise_error(admin_client, create_dag_run
     assert resp.status_code == 200
 
     resp_html: str = resp.data.decode("utf-8")
-    assert "echo Hello Apache Airflow" in resp_html
+    assert "echo Hello Apache Airflow" not in resp_html
     assert (
         "Webserver does not have access to User-defined Macros or Filters when "
         "Dag Serialization is enabled. Hence for the task that have not yet "

[airflow] 03/04: Move setgid as the first command executed in forked task runner (#20040)

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7dd992346a5d98582ed03465df42ccdf7b4ba692
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Sat Dec 4 23:38:33 2021 +0100

    Move setgid as the first command executed in forked task runner (#20040)
    
    The runner setgid command was executed after importing several airflow
    imports, which - when executed for the first time could take quite
    some time (possibly even few seconds). The setgid command should be
    done as soon as possible, in case of any errors in the import, it
    would fail and the setgid could be never set.
    
    Also this caused the test_start_and_terminate test to fail in CI
    because the imports could take arbitrary long time (depending on
    parallel tests and whether the imported modules were already
    loaded in the process so setting the gid could be set after more
    than 0.5 seconds.
    
    This change fixes it twofold:
    
    * setgid is moved to be first instruction to be executed (also
      signal handling was moved to before the potentially long
      imports)
    * the test was fixed to wait actively and only fail after the
      timeout of 1s (which should not happen before of the fix above)
    
    Additionally the test was using `task test` command rather than task run,
    and in some circumstances when you tried to run it locally,
    when FORK was disabled (MacOS) the same test could fail with
    a different error because --error-file flag is not defined for
    `task test` command but it is automatically added by the runner.
    
    The task command has been changed to `run'
    
    Fixing this tests caused occasional test_on_kill failure
    which suffered from similar problem and had similar sleep
    implemented.
    
    Thanks to that the test will be usually faster as no significant delays
    will be introduced.
    
    (cherry picked from commit abe01fad324c6b22620685de8b9cf384d8ab0b68)
---
 airflow/task/task_runner/standard_task_runner.py   | 10 +++----
 .../task/task_runner/test_standard_task_runner.py  | 31 +++++++++++++---------
 2 files changed, 23 insertions(+), 18 deletions(-)

diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py
index 3c54249..381cf5a 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -52,17 +52,17 @@ class StandardTaskRunner(BaseTaskRunner):
             self.log.info("Started process %d to run task", pid)
             return psutil.Process(pid)
         else:
+            # Start a new process group
+            os.setpgid(0, 0)
             import signal
 
+            signal.signal(signal.SIGINT, signal.SIG_DFL)
+            signal.signal(signal.SIGTERM, signal.SIG_DFL)
+
             from airflow import settings
             from airflow.cli.cli_parser import get_parser
             from airflow.sentry import Sentry
 
-            signal.signal(signal.SIGINT, signal.SIG_DFL)
-            signal.signal(signal.SIGTERM, signal.SIG_DFL)
-            # Start a new process group
-            os.setpgid(0, 0)
-
             # Force a new SQLAlchemy session. We can't share open DB handles
             # between process. The cli code will re-create this as part of its
             # normal startup
diff --git a/tests/task/task_runner/test_standard_task_runner.py b/tests/task/task_runner/test_standard_task_runner.py
index dcb9f88..4be1d66 100644
--- a/tests/task/task_runner/test_standard_task_runner.py
+++ b/tests/task/task_runner/test_standard_task_runner.py
@@ -32,6 +32,7 @@ from airflow.utils import timezone
 from airflow.utils.platform import getuser
 from airflow.utils.session import create_session
 from airflow.utils.state import State
+from airflow.utils.timeout import timeout
 from tests.test_utils.db import clear_db_runs
 
 TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
@@ -77,7 +78,7 @@ class TestStandardTaskRunner:
         local_task_job.task_instance.command_as_list.return_value = [
             'airflow',
             'tasks',
-            'test',
+            'run',
             'test_on_kill',
             'task1',
             '2016-01-01',
@@ -85,14 +86,17 @@ class TestStandardTaskRunner:
 
         runner = StandardTaskRunner(local_task_job)
         runner.start()
-        time.sleep(0.5)
-
-        pgid = os.getpgid(runner.process.pid)
-        assert pgid > 0
-        assert pgid != os.getpgid(0), "Task should be in a different process group to us"
-
-        processes = list(self._procs_in_pgroup(pgid))
-
+        # Wait until process sets its pgid to be equal to pid
+        with timeout(seconds=1):
+            while True:
+                runner_pgid = os.getpgid(runner.process.pid)
+                if runner_pgid == runner.process.pid:
+                    break
+                time.sleep(0.01)
+
+        assert runner_pgid > 0
+        assert runner_pgid != os.getpgid(0), "Task should be in a different process group to us"
+        processes = list(self._procs_in_pgroup(runner_pgid))
         runner.terminate()
 
         for process in processes:
@@ -215,10 +219,11 @@ class TestStandardTaskRunner:
             session.close()  # explicitly close as `create_session`s commit will blow up otherwise
 
         # Wait some time for the result
-        for _ in range(20):
-            if os.path.exists(path):
-                break
-            time.sleep(2)
+        with timeout(seconds=40):
+            while True:
+                if os.path.exists(path):
+                    break
+                time.sleep(0.01)
 
         with open(path) as f:
             assert "ON_KILL_TEST" == f.readline()

[airflow] 04/04: Fix flaky on_kill (#20054)

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 20ac7c196e9e8ba420fd89ce23c02c6ab6bdb491
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Sun Dec 5 20:06:26 2021 +0100

    Fix flaky on_kill (#20054)
    
    The previous fix in #20040 improved forked tests but also caused
    instability in the "on_kill" test for standard task runner.
    
    This PR fixes the instability by signalling when the task started
    rather than waiting for fixed amount of time and it adds better
    diagnostics for the test.
    
    (cherry picked from commit e2345ffca9013de8dedaa6c75dbecb48c073353f)
---
 tests/dags/test_on_kill.py                         | 10 ++++-
 .../task/task_runner/test_standard_task_runner.py  | 44 ++++++++++++++--------
 2 files changed, 38 insertions(+), 16 deletions(-)

diff --git a/tests/dags/test_on_kill.py b/tests/dags/test_on_kill.py
index 3e53df6..5b6a4e3 100644
--- a/tests/dags/test_on_kill.py
+++ b/tests/dags/test_on_kill.py
@@ -26,6 +26,12 @@ class DummyWithOnKill(DummyOperator):
     def execute(self, context):
         import os
 
+        self.log.info("Signalling that I am running")
+        # signal to the test that we've started
+        with open("/tmp/airflow_on_kill_running", "w") as f:
+            f.write("ON_KILL_RUNNING")
+        self.log.info("Signalled")
+
         # This runs extra processes, so that we can be sure that we correctly
         # tidy up all processes launched by a task when killing
         if not os.fork():
@@ -34,11 +40,13 @@ class DummyWithOnKill(DummyOperator):
 
     def on_kill(self):
         self.log.info("Executing on_kill")
-        with open("/tmp/airflow_on_kill", "w") as f:
+        with open("/tmp/airflow_on_kill_killed", "w") as f:
             f.write("ON_KILL_TEST")
+        self.log.info("Executed on_kill")
 
 
 # DAG tests backfill with pooled tasks
 # Previously backfill would queue the task but never run it
 dag1 = DAG(dag_id='test_on_kill', start_date=datetime(2015, 1, 1))
+
 dag1_task1 = DummyWithOnKill(task_id='task1', dag=dag1, owner='airflow')
diff --git a/tests/task/task_runner/test_standard_task_runner.py b/tests/task/task_runner/test_standard_task_runner.py
index 4be1d66..abd9f71 100644
--- a/tests/task/task_runner/test_standard_task_runner.py
+++ b/tests/task/task_runner/test_standard_task_runner.py
@@ -177,9 +177,14 @@ class TestStandardTaskRunner:
         Test that ensures that clearing in the UI SIGTERMS
         the task
         """
-        path = "/tmp/airflow_on_kill"
+        path_on_kill_running = "/tmp/airflow_on_kill_running"
+        path_on_kill_killed = "/tmp/airflow_on_kill_killed"
         try:
-            os.unlink(path)
+            os.unlink(path_on_kill_running)
+        except OSError:
+            pass
+        try:
+            os.unlink(path_on_kill_killed)
         except OSError:
             pass
 
@@ -205,27 +210,36 @@ class TestStandardTaskRunner:
             runner = StandardTaskRunner(job1)
             runner.start()
 
-            # give the task some time to startup
+            with timeout(seconds=3):
+                while True:
+                    runner_pgid = os.getpgid(runner.process.pid)
+                    if runner_pgid == runner.process.pid:
+                        break
+                    time.sleep(0.01)
+
+            processes = list(self._procs_in_pgroup(runner_pgid))
+
+            logging.info("Waiting for the task to start")
+            with timeout(seconds=4):
+                while True:
+                    if os.path.exists(path_on_kill_running):
+                        break
+                    time.sleep(0.01)
+            logging.info("Task started. Give the task some time to settle")
             time.sleep(3)
-
-            pgid = os.getpgid(runner.process.pid)
-            assert pgid > 0
-            assert pgid != os.getpgid(0), "Task should be in a different process group to us"
-
-            processes = list(self._procs_in_pgroup(pgid))
-
+            logging.info(f"Terminating processes {processes} belonging to {runner_pgid} group")
             runner.terminate()
-
             session.close()  # explicitly close as `create_session`s commit will blow up otherwise
 
-        # Wait some time for the result
-        with timeout(seconds=40):
+        logging.info("Waiting for the on kill killed file to appear")
+        with timeout(seconds=4):
             while True:
-                if os.path.exists(path):
+                if os.path.exists(path_on_kill_killed):
                     break
                 time.sleep(0.01)
+        logging.info("The file appeared")
 
-        with open(path) as f:
+        with open(path_on_kill_killed) as f:
             assert "ON_KILL_TEST" == f.readline()
 
         for process in processes:

[airflow] 02/04: Fix race condition when starting DagProcessorAgent (#19935)

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1b958ca85dc9abafb0ad687c0a32992f59ae6186
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Fri Dec 3 00:44:59 2021 +0100

    Fix race condition when starting DagProcessorAgent (#19935)
    
    As described in detail in #19860, there was a race condition in
    starting and terminating DagProcessorAgent that caused us a lot
    of headeaches with flaky test_scheduler_job failures on our CI
    and after long investigation, it turned out to be a race
    condition. Not very likely, but possible to happen in production.
    
    The race condition involved starting DagProcessorAgent via
    multiprocessing, where the first action of the agent was changing
    the process GID to be the same as PID. If the DagProcessorAgent
    was terminated quickly (on a busy system) before the process
    could change the GID, the `reap_process_group` that was supposed
    to kill the whole group, was failing and the DagProcessorAgent
    remained running.
    
    This problem revealed a wrong behaviour of Airflow in some edge
    conditions when 'spawn' mode was used for starting the DAG processor
    Details are described in #19934, but this problem will have to be
    solved differently (avoiding ORM reinitialization during DAG
    processor starting).
    
    This change also moves the tests for `spawn` method out from
    test_scheduler_job.py (it was a remnant of old Airlfow and it
    did not really test what it was supposed to test). Instead tests
    were added for different spawn modes and killing the processor
    agent in both spawn and "default" mode.
    
    (cherry picked from commit 525484388464619832a14d1b28e06e3a097aac97)
---
 airflow/dag_processing/manager.py      | 10 +++++++
 airflow/utils/process_utils.py         | 55 ++++++++++++++++++++++++----------
 tests/dag_processing/test_processor.py | 50 +++++++++++++++++++++++++++++++
 3 files changed, 100 insertions(+), 15 deletions(-)

diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index 19a97ff..00bffc5 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -260,6 +260,16 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
         os.environ['AIRFLOW__LOGGING__COLORED_CONSOLE_LOG'] = 'False'
         # Replicating the behavior of how logging module was loaded
         # in logging_config.py
+
+        # TODO: This reloading should be removed when we fix our logging behaviour
+        # In case of "spawn" method of starting processes for multiprocessing, reinitializing of the
+        # SQLAlchemy engine causes extremely unexpected behaviour of messing with objects already loaded
+        # in a parent process (likely via resources shared in memory by the ORM libraries).
+        # This caused flaky tests in our CI for many months and has been discovered while
+        # iterating on https://github.com/apache/airflow/pull/19860
+        # The issue that describes the problem and possible remediation is
+        # at https://github.com/apache/airflow/issues/19934
+
         importlib.reload(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit('.', 1)[0]))  # type: ignore
         importlib.reload(airflow.settings)
         airflow.settings.initialize()
diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py
index 3c03917..3b12115 100644
--- a/airflow/utils/process_utils.py
+++ b/airflow/utils/process_utils.py
@@ -45,7 +45,7 @@ DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = conf.getint('core', 'KILLED_TASK_CLEANUP_TI
 
 
 def reap_process_group(
-    pgid: int,
+    process_group_id: int,
     logger,
     sig: 'signal.Signals' = signal.SIGTERM,
     timeout: int = DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM,
@@ -55,7 +55,11 @@ def reap_process_group(
     sig (SIGTERM) to the process group of pid. If any process is alive after timeout
     a SIGKILL will be send.
 
-    :param pgid: process group id to kill
+    :param process_group_id: process group id to kill.
+           The process that wants to create the group should run `os.setpgid(0, 0)` as the first
+           command it executes which will set group id = process_id. Effectively the process that is the
+           "root" of the group has pid = gid and all other processes in the group have different
+           pids but the same gid (equal the pid of the root process)
     :param logger: log handler
     :param sig: signal type
     :param timeout: how much time a process has to terminate
@@ -68,36 +72,57 @@ def reap_process_group(
 
     def signal_procs(sig):
         try:
-            os.killpg(pgid, sig)
-        except OSError as err:
+            logger.info("Sending the signal %s to group %s", sig, process_group_id)
+            os.killpg(process_group_id, sig)
+        except OSError as err_killpg:
             # If operation not permitted error is thrown due to run_as_user,
             # use sudo -n(--non-interactive) to kill the process
-            if err.errno == errno.EPERM:
+            if err_killpg.errno == errno.EPERM:
                 subprocess.check_call(
-                    ["sudo", "-n", "kill", "-" + str(int(sig))] + [str(p.pid) for p in children]
+                    ["sudo", "-n", "kill", "-" + str(int(sig))]
+                    + [str(p.pid) for p in all_processes_in_the_group]
+                )
+            elif err_killpg.errno == errno.ESRCH:
+                # There is a rare condition that the process has not managed yet to change it's process
+                # group. In this case os.killpg fails with ESRCH error
+                # So we additionally send a kill signal to the process itself.
+                logger.info(
+                    "Sending the signal %s to process %s as process group is missing.", sig, process_group_id
                 )
+                try:
+                    os.kill(process_group_id, sig)
+                except OSError as err_kill:
+                    if err_kill.errno == errno.EPERM:
+                        subprocess.check_call(["sudo", "-n", "kill", "-" + str(process_group_id)])
+                    else:
+                        raise
             else:
                 raise
 
-    if pgid == os.getpgid(0):
+    if process_group_id == os.getpgid(0):
         raise RuntimeError("I refuse to kill myself")
 
     try:
-        parent = psutil.Process(pgid)
+        parent = psutil.Process(process_group_id)
 
-        children = parent.children(recursive=True)
-        children.append(parent)
+        all_processes_in_the_group = parent.children(recursive=True)
+        all_processes_in_the_group.append(parent)
     except psutil.NoSuchProcess:
         # The process already exited, but maybe it's children haven't.
-        children = []
+        all_processes_in_the_group = []
         for proc in psutil.process_iter():
             try:
-                if os.getpgid(proc.pid) == pgid and proc.pid != 0:
-                    children.append(proc)
+                if os.getpgid(proc.pid) == process_group_id and proc.pid != 0:
+                    all_processes_in_the_group.append(proc)
             except OSError:
                 pass
 
-    logger.info("Sending %s to GPID %s", sig, pgid)
+    logger.info(
+        "Sending %s to group %s. PIDs of all processes in the group: %s",
+        sig,
+        process_group_id,
+        [p.pid for p in all_processes_in_the_group],
+    )
     try:
         signal_procs(sig)
     except OSError as err:
@@ -106,7 +131,7 @@ def reap_process_group(
         if err.errno == errno.ESRCH:
             return returncodes
 
-    _, alive = psutil.wait_procs(children, timeout=timeout, callback=on_terminate)
+    _, alive = psutil.wait_procs(all_processes_in_the_group, timeout=timeout, callback=on_terminate)
 
     if alive:
         for proc in alive:
diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py
index 6bfec8f..c9ecfb0 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -27,6 +27,7 @@ import pytest
 
 from airflow import settings
 from airflow.configuration import TEST_DAGS_FOLDER, conf
+from airflow.dag_processing.manager import DagFileProcessorAgent
 from airflow.dag_processing.processor import DagFileProcessor
 from airflow.models import DagBag, DagModel, SlaMiss, TaskInstance, errors
 from airflow.models.taskinstance import SimpleTaskInstance
@@ -723,3 +724,52 @@ class TestDagFileProcessor:
         dags = session.query(DagModel).all()
         assert [dag.dag_id for dag in dags if dag.is_active] == ['test_only_dummy_tasks']
         assert [dag.dag_id for dag in dags if not dag.is_active] == ['missing_dag']
+
+
+class TestProcessorAgent:
+    @pytest.fixture(autouse=True)
+    def per_test(self):
+        self.processor_agent = None
+        yield
+        if self.processor_agent:
+            self.processor_agent.end()
+
+    def test_error_when_waiting_in_async_mode(self, tmp_path):
+        self.processor_agent = DagFileProcessorAgent(
+            dag_directory=str(tmp_path),
+            max_runs=1,
+            processor_timeout=datetime.timedelta(1),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=True,
+        )
+        self.processor_agent.start()
+        with pytest.raises(RuntimeError, match="wait_until_finished should only be called in sync_mode"):
+            self.processor_agent.wait_until_finished()
+
+    def test_default_multiprocessing_behaviour(self, tmp_path):
+        self.processor_agent = DagFileProcessorAgent(
+            dag_directory=str(tmp_path),
+            max_runs=1,
+            processor_timeout=datetime.timedelta(1),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=False,
+        )
+        self.processor_agent.start()
+        self.processor_agent.run_single_parsing_loop()
+        self.processor_agent.wait_until_finished()
+
+    @conf_vars({("core", "mp_start_method"): "spawn"})
+    def test_spawn_multiprocessing_behaviour(self, tmp_path):
+        self.processor_agent = DagFileProcessorAgent(
+            dag_directory=str(tmp_path),
+            max_runs=1,
+            processor_timeout=datetime.timedelta(1),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=False,
+        )
+        self.processor_agent.start()
+        self.processor_agent.run_single_parsing_loop()
+        self.processor_agent.wait_until_finished()