You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by qi...@apache.org on 2021/05/29 15:28:20 UTC

[airflow] branch v2-1-test updated (304e174 -> bdf9bc2)

This is an automated email from the ASF dual-hosted git repository.

qian pushed a change to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from 304e174  Sandbox templates (#15912)
     new 285791d  Fix Celery executor getting stuck randomly because of reset_signals in multiprocessing (#15989)
     new bdf9bc2  Fix dag.clear() to set multiple dags to running when necessary (#15382)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 UPDATING.md                                        | 10 +++
 .../endpoints/task_instance_endpoint.py            | 12 +--
 airflow/executors/celery_executor.py               | 24 ++----
 airflow/jobs/scheduler_job.py                      | 16 ++++
 airflow/models/dag.py                              | 21 ++---
 airflow/models/taskinstance.py                     | 19 +++--
 scripts/ci/docker-compose/base.yml                 |  2 +
 tests/executors/test_celery_executor.py            | 52 ++++++++++++
 tests/sensors/test_external_task_sensor.py         | 96 ++++++++++++++++++++++
 9 files changed, 206 insertions(+), 46 deletions(-)

[airflow] 02/02: Fix dag.clear() to set multiple dags to running when necessary (#15382)

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qian pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bdf9bc20354cf924a9fe0929db8613e4974c785e
Author: yuqian90 <yu...@gmail.com>
AuthorDate: Sat May 29 23:01:39 2021 +0800

    Fix dag.clear() to set multiple dags to running when necessary (#15382)
    
    closes: #14260
    related: #9824
    
    When clearing task across dags using ExternalTaskMarker the dag state of the external DagRun is not set to active. So cleared tasks in the external dag will not automatically start if the DagRun is a Failed or Succeeded state.
    
    Two changes are made to fix the issue:
    
    Make clear_task_instances set DagRuns' state to dag_run_state for all the affected DagRuns.
    The filter for DagRun in clear_task_instances is fixed too. Previously, it made an assumption that execution_dates for all the dag_ids are the same, which is not always correct.
    test_external_task_marker_clear_activate is added to make sure the fix does the right thing.
    
    (cherry picked from commit 2bca8a5425c234b04fdf32d6c50ae3a91cd08262)
---
 UPDATING.md                                        | 10 +++
 .../endpoints/task_instance_endpoint.py            | 12 +--
 airflow/models/dag.py                              | 21 ++---
 airflow/models/taskinstance.py                     | 19 +++--
 tests/sensors/test_external_task_sensor.py         | 96 ++++++++++++++++++++++
 5 files changed, 128 insertions(+), 30 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index e7a9119..5b03ad8 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -69,6 +69,16 @@ https://developers.google.com/style/inclusive-documentation
 
 -->
 
+## Master
+
+### `activate_dag_runs` argument of the function `clear_task_instances` is replaced with `dag_run_state`
+
+To achieve the previous default behaviour of `clear_task_instances` with `activate_dag_runs=True`, no change is needed. To achieve the previous behaviour of `activate_dag_runs=False`, pass `dag_run_state=False` instead.
+
+### `dag.set_dag_runs_state` is deprecated
+
+The method `set_dag_runs_state` is no longer needed after a bug fix in PR: [#15382](https://github.com/apache/airflow/pull/15382). This method is now deprecated and will be removed in a future version.
+
 ## Airflow 2.1.0
 
 ### New "deprecated_api" extra
diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index b84c59a..418bded 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -251,18 +251,8 @@ def post_clear_task_instances(dag_id: str, session=None):
     task_instances = dag.clear(get_tis=True, **data)
     if not data["dry_run"]:
         clear_task_instances(
-            task_instances,
-            session,
-            dag=dag,
-            activate_dag_runs=False,  # We will set DagRun state later.
+            task_instances, session, dag=dag, dag_run_state=State.RUNNING if reset_dag_runs else False
         )
-        if reset_dag_runs:
-            dag.set_dag_runs_state(
-                session=session,
-                start_date=data["start_date"],
-                end_date=data["end_date"],
-                state=State.RUNNING,
-            )
     task_instances = task_instances.join(
         DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date)
     ).add_column(DR.run_id)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index c90fb4f..8e96554 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1123,6 +1123,11 @@ class DAG(LoggingMixin):
         end_date: Optional[datetime] = None,
         dag_ids: List[str] = None,
     ) -> None:
+        warnings.warn(
+            "This method is deprecated and will be removed in a future version.",
+            DeprecationWarning,
+            stacklevel=2,
+        )
         dag_ids = dag_ids or [self.dag_id]
         query = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
         if start_date:
@@ -1172,7 +1177,8 @@ class DAG(LoggingMixin):
         :type include_subdags: bool
         :param include_parentdag: Clear tasks in the parent dag of the subdag.
         :type include_parentdag: bool
-        :param dag_run_state: state to set DagRun to
+        :param dag_run_state: state to set DagRun to. If set to False, dagrun state will not
+            be changed.
         :param dry_run: Find the tasks to clear but don't clear them.
         :type dry_run: bool
         :param session: The sqlalchemy session to use
@@ -1193,20 +1199,17 @@ class DAG(LoggingMixin):
         """
         TI = TaskInstance
         tis = session.query(TI)
-        dag_ids = []
         if include_subdags:
             # Crafting the right filter for dag_id and task_ids combo
             conditions = []
             for dag in self.subdags + [self]:
                 conditions.append((TI.dag_id == dag.dag_id) & TI.task_id.in_(dag.task_ids))
-                dag_ids.append(dag.dag_id)
             tis = tis.filter(or_(*conditions))
         else:
             tis = session.query(TI).filter(TI.dag_id == self.dag_id)
             tis = tis.filter(TI.task_id.in_(self.task_ids))
 
         if include_parentdag and self.is_subdag and self.parent_dag is not None:
-            dag_ids.append(self.parent_dag.dag_id)
             p_dag = self.parent_dag.sub_dag(
                 task_ids_or_regex=r"^{}$".format(self.dag_id.split('.')[1]),
                 include_upstream=False,
@@ -1340,15 +1343,7 @@ class DAG(LoggingMixin):
                 tis,
                 session,
                 dag=self,
-                activate_dag_runs=False,  # We will set DagRun state later.
-            )
-
-            self.set_dag_runs_state(
-                session=session,
-                start_date=start_date,
-                end_date=end_date,
-                state=dag_run_state,
-                dag_ids=dag_ids,
+                dag_run_state=dag_run_state,
             )
         else:
             count = 0
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index a7e94bd..ae7eeef 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -133,7 +133,7 @@ def set_error_file(error_file: str, error: Union[str, Exception]) -> None:
 def clear_task_instances(
     tis,
     session,
-    activate_dag_runs=True,
+    dag_run_state: str = State.RUNNING,
     dag=None,
 ):
     """
@@ -142,7 +142,8 @@ def clear_task_instances(
 
     :param tis: a list of task instances
     :param session: current session
-    :param activate_dag_runs: flag to check for active dag run
+    :param dag_run_state: state to set DagRun to. If set to False, dagrun state will not
+        be changed.
     :param dag: DAG object
     """
     job_ids = []
@@ -204,19 +205,25 @@ def clear_task_instances(
         for job in session.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all():  # noqa
             job.state = State.SHUTDOWN
 
-    if activate_dag_runs and tis:
+    if dag_run_state is not False and tis:
         from airflow.models.dagrun import DagRun  # Avoid circular import
 
+        dates_by_dag_id = defaultdict(set)
+        for instance in tis:
+            dates_by_dag_id[instance.dag_id].add(instance.execution_date)
+
         drs = (
             session.query(DagRun)
             .filter(
-                DagRun.dag_id.in_({ti.dag_id for ti in tis}),
-                DagRun.execution_date.in_({ti.execution_date for ti in tis}),
+                or_(
+                    and_(DagRun.dag_id == dag_id, DagRun.execution_date.in_(dates))
+                    for dag_id, dates in dates_by_dag_id.items()
+                )
             )
             .all()
         )
         for dr in drs:
-            dr.state = State.RUNNING
+            dr.state = dag_run_state
             dr.start_date = timezone.utcnow()
 
 
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 55080c9..19161a7 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -416,6 +416,53 @@ def dag_bag_ext():
     return dag_bag
 
 
+@pytest.fixture
+def dag_bag_parent_child():
+    """
+    Create a DagBag with two DAGs looking like this. task_1 of child_dag_1 on day 1 depends on
+    task_0 of parent_dag_0 on day 1. Therefore, when task_0 of parent_dag_0 on day 1 and day 2
+    are cleared, parent_dag_0 DagRuns need to be set to running on both days, but child_dag_1
+    only needs to be set to running on day 1.
+
+                   day 1   day 2
+
+     parent_dag_0  task_0  task_0
+                     |
+                     |
+                     v
+     child_dag_1   task_1  task_1
+
+    """
+    dag_bag = DagBag(dag_folder=DEV_NULL, include_examples=False)
+
+    day_1 = DEFAULT_DATE
+
+    dag_0 = DAG("parent_dag_0", start_date=day_1, schedule_interval=None)
+    task_0 = ExternalTaskMarker(
+        task_id="task_0",
+        external_dag_id="child_dag_1",
+        external_task_id="task_1",
+        execution_date=day_1.isoformat(),
+        recursion_depth=3,
+        dag=dag_0,
+    )
+
+    dag_1 = DAG("child_dag_1", start_date=day_1, schedule_interval=None)
+    _ = ExternalTaskSensor(
+        task_id="task_1",
+        external_dag_id=dag_0.dag_id,
+        external_task_id=task_0.task_id,
+        execution_date_fn=lambda execution_date: day_1 if execution_date == day_1 else [],
+        mode='reschedule',
+        dag=dag_1,
+    )
+
+    for dag in [dag_0, dag_1]:
+        dag_bag.bag_dag(dag=dag, root_dag=dag)
+
+    return dag_bag
+
+
 def run_tasks(dag_bag, execution_date=DEFAULT_DATE):
     """
     Run all tasks in the DAGs in the given dag_bag. Return the TaskInstance objects as a dict
@@ -464,6 +511,55 @@ def test_external_task_marker_transitive(dag_bag_ext):
     assert_ti_state_equal(ti_b_3, State.NONE)
 
 
+# pylint: disable=redefined-outer-name
+def test_external_task_marker_clear_activate(dag_bag_parent_child):
+    """
+    Test clearing tasks across DAGs and make sure the right DagRuns are activated.
+    """
+    from airflow.utils.session import create_session
+    from airflow.utils.types import DagRunType
+
+    dag_bag = dag_bag_parent_child
+    day_1 = DEFAULT_DATE
+    day_2 = DEFAULT_DATE + timedelta(days=1)
+
+    run_tasks(dag_bag, execution_date=day_1)
+    run_tasks(dag_bag, execution_date=day_2)
+
+    with create_session() as session:
+        for dag in dag_bag.dags.values():
+            for execution_date in [day_1, day_2]:
+                dagrun = dag.create_dagrun(
+                    State.RUNNING, execution_date, run_type=DagRunType.MANUAL, session=session
+                )
+                dagrun.set_state(State.SUCCESS)
+                session.add(dagrun)
+
+        session.commit()
+
+    # Assert that dagruns of all the affected dags are set to SUCCESS before tasks are cleared.
+    for dag in dag_bag.dags.values():
+        for execution_date in [day_1, day_2]:
+            dagrun = dag.get_dagrun(execution_date=execution_date)
+            assert dagrun.state == State.SUCCESS
+
+    dag_0 = dag_bag.get_dag("parent_dag_0")
+    task_0 = dag_0.get_task("task_0")
+    clear_tasks(dag_bag, dag_0, task_0, start_date=day_1, end_date=day_2)
+
+    # Assert that dagruns of all the affected dags are set to RUNNING after tasks are cleared.
+    # Unaffected dagruns should be left as SUCCESS.
+    dagrun_0_1 = dag_bag.get_dag('parent_dag_0').get_dagrun(execution_date=day_1)
+    dagrun_0_2 = dag_bag.get_dag('parent_dag_0').get_dagrun(execution_date=day_2)
+    dagrun_1_1 = dag_bag.get_dag('child_dag_1').get_dagrun(execution_date=day_1)
+    dagrun_1_2 = dag_bag.get_dag('child_dag_1').get_dagrun(execution_date=day_2)
+
+    assert dagrun_0_1.state == State.RUNNING
+    assert dagrun_0_2.state == State.RUNNING
+    assert dagrun_1_1.state == State.RUNNING
+    assert dagrun_1_2.state == State.SUCCESS
+
+
 def test_external_task_marker_future(dag_bag_ext):
     """
     Test clearing tasks with no end_date. This is the case when users clear tasks with

[airflow] 01/02: Fix Celery executor getting stuck randomly because of reset_signals in multiprocessing (#15989)

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qian pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 285791d60317bb3faf0601124256a3b49bc33c46
Author: yuqian90 <yu...@gmail.com>
AuthorDate: Sat May 29 23:00:54 2021 +0800

    Fix Celery executor getting stuck randomly because of reset_signals in multiprocessing (#15989)
    
    Fixes #15938
    
    multiprocessing.Pool is known to often become stuck. It causes celery_executor to hang randomly. This happens at least on Debian, Ubuntu using Python 3.8.7 and Python 3.8.10. The issue is reproducible by running test_send_tasks_to_celery_hang in this PR several times (with db backend set to something other than sqlite because sqlite disables some parallelization)
    
    The issue goes away once switched to concurrent.futures.ProcessPoolExecutor. In python 3.6 and earlier, ProcessPoolExecutor has no initializer argument. Fortunately, it's not needed because reset_signal is no longer needed because the signal handler now checks if the current process is the parent.
    
    (cherry picked from commit f75dd7ae6e755dad328ba6f3fd462ade194dab25)
---
 airflow/executors/celery_executor.py    | 24 +++++----------
 airflow/jobs/scheduler_job.py           | 16 ++++++++++
 scripts/ci/docker-compose/base.yml      |  2 ++
 tests/executors/test_celery_executor.py | 52 +++++++++++++++++++++++++++++++++
 4 files changed, 78 insertions(+), 16 deletions(-)

diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index bc321c6..553639b 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -30,7 +30,8 @@ import subprocess
 import time
 import traceback
 from collections import OrderedDict
-from multiprocessing import Pool, cpu_count
+from concurrent.futures import ProcessPoolExecutor
+from multiprocessing import cpu_count
 from typing import Any, Dict, List, Mapping, MutableMapping, Optional, Set, Tuple, Union
 
 from celery import Celery, Task, states as celery_states
@@ -318,18 +319,9 @@ class CeleryExecutor(BaseExecutor):
         chunksize = self._num_tasks_per_send_process(len(task_tuples_to_send))
         num_processes = min(len(task_tuples_to_send), self._sync_parallelism)
 
-        def reset_signals():
-            # Since we are run from inside the SchedulerJob, we don't to
-            # inherit the signal handlers that we registered there.
-            import signal
-
-            signal.signal(signal.SIGINT, signal.SIG_DFL)
-            signal.signal(signal.SIGTERM, signal.SIG_DFL)
-            signal.signal(signal.SIGUSR2, signal.SIG_DFL)
-
-        with Pool(processes=num_processes, initializer=reset_signals) as send_pool:
-            key_and_async_results = send_pool.map(
-                send_task_to_executor, task_tuples_to_send, chunksize=chunksize
+        with ProcessPoolExecutor(max_workers=num_processes) as send_pool:
+            key_and_async_results = list(
+                send_pool.map(send_task_to_executor, task_tuples_to_send, chunksize=chunksize)
             )
         return key_and_async_results
 
@@ -592,11 +584,11 @@ class BulkStateFetcher(LoggingMixin):
     def _get_many_using_multiprocessing(self, async_results) -> Mapping[str, EventBufferValueType]:
         num_process = min(len(async_results), self._sync_parallelism)
 
-        with Pool(processes=num_process) as sync_pool:
+        with ProcessPoolExecutor(max_workers=num_process) as sync_pool:
             chunksize = max(1, math.floor(math.ceil(1.0 * len(async_results) / self._sync_parallelism)))
 
-            task_id_to_states_and_info = sync_pool.map(
-                fetch_celery_task_state, async_results, chunksize=chunksize
+            task_id_to_states_and_info = list(
+                sync_pool.map(fetch_celery_task_state, async_results, chunksize=chunksize)
             )
 
             states_and_info_by_task_id: MutableMapping[str, EventBufferValueType] = {}
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index e86a6e7..cece87e 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -670,6 +670,14 @@ class DagFileProcessor(LoggingMixin):
         return len(dagbag.dags), len(dagbag.import_errors)
 
 
+def _is_parent_process():
+    """
+    Returns True if the current process is the parent process. False if the current process is a child
+    process started by multiprocessing.
+    """
+    return multiprocessing.current_process().name == 'MainProcess'
+
+
 class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
     """
     This SchedulerJob runs for a specific time interval and schedules the jobs
@@ -745,12 +753,20 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
 
     def _exit_gracefully(self, signum, frame) -> None:  # pylint: disable=unused-argument
         """Helper method to clean up processor_agent to avoid leaving orphan processes."""
+        if not _is_parent_process():
+            # Only the parent process should perform the cleanup.
+            return
+
         self.log.info("Exiting gracefully upon receiving signal %s", signum)
         if self.processor_agent:
             self.processor_agent.end()
         sys.exit(os.EX_OK)
 
     def _debug_dump(self, signum, frame):  # pylint: disable=unused-argument
+        if not _is_parent_process():
+            # Only the parent process should perform the debug dump.
+            return
+
         try:
             sig_name = signal.Signals(signum).name  # pylint: disable=no-member
         except Exception:  # pylint: disable=broad-except
diff --git a/scripts/ci/docker-compose/base.yml b/scripts/ci/docker-compose/base.yml
index eab6425..6b1cb4e 100644
--- a/scripts/ci/docker-compose/base.yml
+++ b/scripts/ci/docker-compose/base.yml
@@ -34,6 +34,8 @@ services:
     ports:
       - "${WEBSERVER_HOST_PORT}:8080"
       - "${FLOWER_HOST_PORT}:5555"
+    cap_add:
+      - SYS_PTRACE
 volumes:
   sqlite-db-volume:
   postgres-db-volume:
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index f454c5a..19c8a0d 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -18,6 +18,7 @@
 import contextlib
 import json
 import os
+import signal
 import sys
 import unittest
 from datetime import datetime, timedelta
@@ -484,3 +485,54 @@ class TestBulkStateFetcher(unittest.TestCase):
         assert [
             'DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 2 state(s) for 2 task(s)'
         ] == cm.output
+
+
+class MockTask:
+    """
+    A picklable object used to mock tasks sent to Celery. Can't use the mock library
+    here because it's not picklable.
+    """
+
+    def apply_async(self, *args, **kwargs):
+        return 1
+
+
+def _exit_gracefully(signum, _):
+    print(f"{os.getpid()} Exiting gracefully upon receiving signal {signum}")
+    sys.exit(signum)
+
+
+@pytest.fixture
+def register_signals():
+    """
+    Register the same signals as scheduler does to test celery_executor to make sure it does not
+    hang.
+    """
+    orig_sigint = orig_sigterm = orig_sigusr2 = signal.SIG_DFL
+
+    orig_sigint = signal.signal(signal.SIGINT, _exit_gracefully)
+    orig_sigterm = signal.signal(signal.SIGTERM, _exit_gracefully)
+    orig_sigusr2 = signal.signal(signal.SIGUSR2, _exit_gracefully)
+
+    yield
+
+    # Restore original signal handlers after test
+    signal.signal(signal.SIGINT, orig_sigint)
+    signal.signal(signal.SIGTERM, orig_sigterm)
+    signal.signal(signal.SIGUSR2, orig_sigusr2)
+
+
+def test_send_tasks_to_celery_hang(register_signals):  # pylint: disable=unused-argument
+    """
+    Test that celery_executor does not hang after many runs.
+    """
+    executor = celery_executor.CeleryExecutor()
+
+    task = MockTask()
+    task_tuples_to_send = [(None, None, None, None, task) for _ in range(26)]
+
+    for _ in range(500):
+        # This loop can hang on Linux if celery_executor does something wrong with
+        # multiprocessing.
+        results = executor._send_tasks_to_celery(task_tuples_to_send)
+        assert results == [(None, None, 1) for _ in task_tuples_to_send]