You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/08/19 15:05:18 UTC

[airflow] branch main updated: Rename ``task_concurrency`` to ``max_active_tis_per_dag`` (#17708)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 35a6c30  Rename ``task_concurrency`` to ``max_active_tis_per_dag`` (#17708)
35a6c30 is described below

commit 35a6c302772bf5d0bb74646b2da04856c7d7aaac
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Thu Aug 19 16:05:02 2021 +0100

    Rename ``task_concurrency`` to ``max_active_tis_per_dag`` (#17708)
    
    Follow-up of https://github.com/apache/airflow/pull/16267
    
    Renames `task_concurrency` to `max_active_tis_per_dag`
    
    Some of Airflow's concurrency settings have been a source of confusion for a lot of users (including me), for example:
    
    https://stackoverflow.com/questions/56370720/how-to-control-the-parallelism-or-concurrency-of-an-airflow-installation
    https://stackoverflow.com/questions/38200666/airflow-parallelism
    This PR is an attempt to make the settings easier to understand
---
 UPDATING.md                                   | 22 ++++++++++++++++++++++
 airflow/jobs/backfill_job.py                  |  4 ++--
 airflow/jobs/scheduler_job.py                 |  2 +-
 airflow/models/baseoperator.py                | 17 +++++++++++++----
 airflow/models/dag.py                         |  2 +-
 airflow/models/dagrun.py                      |  2 +-
 airflow/ti_deps/deps/task_concurrency_dep.py  |  4 ++--
 docs/apache-airflow/faq.rst                   |  2 +-
 tests/conftest.py                             |  4 ++--
 tests/jobs/test_backfill_job.py               | 16 ++++++++--------
 tests/jobs/test_scheduler_job.py              | 12 ++++++------
 tests/models/test_dagrun.py                   |  2 +-
 tests/models/test_taskinstance.py             | 10 +++++-----
 tests/serialization/test_dag_serialization.py |  2 +-
 tests/ti_deps/deps/test_task_concurrency.py   |  4 ++--
 15 files changed, 68 insertions(+), 37 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index 361d12f..b5dd047 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -148,6 +148,28 @@ dag = DAG(
 
 If you are using DAGs Details API endpoint, use `max_active_tasks` instead of `concurrency`.
 
+### Task concurrency parameter has been renamed
+
+`BaseOperator.task_concurrency` has been deprecated and renamed to `max_active_tis_per_dag` for
+better understanding.
+
+This parameter controls the number of concurrent running task instances across ``dag_runs``
+per task.
+
+**Before**:
+
+```python
+with DAG(dag_id="task_concurrency_example"):
+    BashOperator(task_id="t1", task_concurrency=2, bash_command="echo Hi")
+```
+
+**After**:
+
+```python
+with DAG(dag_id="task_concurrency_example"):
+    BashOperator(task_id="t1", max_active_tis_per_dag=2, bash_command="echo Hi")
+```
+
 ### Marking success/failed automatically clears failed downstream tasks
 
 When marking a task success/failed in Graph View, its downstream tasks that are in failed/upstream_failed state are automatically cleared.
diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index 281eaeb..8e120a9 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -587,14 +587,14 @@ class BackfillJob(BaseJob):
                                 "Not scheduling since DAG max_active_tasks limit is reached."
                             )
 
-                        if task.task_concurrency:
+                        if task.max_active_tis_per_dag:
                             num_running_task_instances_in_task = DAG.get_num_task_instances(
                                 dag_id=self.dag_id,
                                 task_ids=[task.task_id],
                                 states=self.STATES_COUNT_AS_RUNNING,
                             )
 
-                            if num_running_task_instances_in_task >= task.task_concurrency:
+                            if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
                                 raise TaskConcurrencyLimitReached(
                                     "Not scheduling since Task concurrency limit is reached."
                                 )
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index e0845c6..54e8259 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -430,7 +430,7 @@ class SchedulerJob(BaseJob):
                     if serialized_dag.has_task(task_instance.task_id):
                         task_concurrency_limit = serialized_dag.get_task(
                             task_instance.task_id
-                        ).task_concurrency
+                        ).max_active_tis_per_dag
 
                     if task_concurrency_limit is not None:
                         current_task_concurrency = task_concurrency_map[
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 7273e3f..78a0fc9 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -360,9 +360,9 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
     :type resources: dict
     :param run_as_user: unix username to impersonate while running the task
     :type run_as_user: str
-    :param task_concurrency: When set, a task will be able to limit the concurrent
-        runs across execution_dates
-    :type task_concurrency: int
+    :param max_active_tis_per_dag: When set, a task will be able to limit the concurrent
+        runs across execution_dates.
+    :type max_active_tis_per_dag: int
     :param executor_config: Additional task-level configuration parameters that are
         interpreted by a specific executor. Parameters are namespaced by the name of
         executor.
@@ -492,6 +492,7 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
         resources: Optional[Dict] = None,
         run_as_user: Optional[str] = None,
         task_concurrency: Optional[int] = None,
+        max_active_tis_per_dag: Optional[int] = None,
         executor_config: Optional[Dict] = None,
         do_xcom_push: bool = True,
         inlets: Optional[Any] = None,
@@ -624,7 +625,15 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
         self.weight_rule = weight_rule
         self.resources: Optional[Resources] = Resources(**resources) if resources else None
         self.run_as_user = run_as_user
-        self.task_concurrency = task_concurrency
+        if task_concurrency and not max_active_tis_per_dag:
+            # TODO: Remove in Airflow 3.0
+            warnings.warn(
+                "The 'task_concurrency' parameter is deprecated. Please use 'max_active_tis_per_dag'.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+            max_active_tis_per_dag = task_concurrency
+        self.max_active_tis_per_dag = max_active_tis_per_dag
         self.executor_config = executor_config or {}
         self.do_xcom_push = do_xcom_push
 
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index d412fb1..d480906 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2233,7 +2233,7 @@ class DAG(LoggingMixin):
             orm_dag.description = dag.description
             orm_dag.schedule_interval = dag.schedule_interval
             orm_dag.max_active_tasks = dag.max_active_tasks
-            orm_dag.has_task_concurrency_limits = any(t.task_concurrency is not None for t in dag.tasks)
+            orm_dag.has_task_concurrency_limits = any(t.max_active_tis_per_dag is not None for t in dag.tasks)
 
             orm_dag.calculate_dagrun_date_fields(
                 dag,
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 098fd5b..cdc036e 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -412,7 +412,7 @@ class DagRun(Base, LoggingMixin):
             unfinished_tasks = info.unfinished_tasks
 
             none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks)
-            none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks)
+            none_task_concurrency = all(t.task.max_active_tis_per_dag is None for t in unfinished_tasks)
             none_deferred = all(t.state != State.DEFERRED for t in unfinished_tasks)
 
             if unfinished_tasks and none_depends_on_past and none_task_concurrency and none_deferred:
diff --git a/airflow/ti_deps/deps/task_concurrency_dep.py b/airflow/ti_deps/deps/task_concurrency_dep.py
index 6334511..264edb8 100644
--- a/airflow/ti_deps/deps/task_concurrency_dep.py
+++ b/airflow/ti_deps/deps/task_concurrency_dep.py
@@ -29,11 +29,11 @@ class TaskConcurrencyDep(BaseTIDep):
 
     @provide_session
     def _get_dep_statuses(self, ti, session, dep_context):
-        if ti.task.task_concurrency is None:
+        if ti.task.max_active_tis_per_dag is None:
             yield self._passing_status(reason="Task concurrency is not set.")
             return
 
-        if ti.get_num_running_task_instances(session) >= ti.task.task_concurrency:
+        if ti.get_num_running_task_instances(session) >= ti.task.max_active_tis_per_dag:
             yield self._failing_status(reason="The max task concurrency has been reached.")
             return
         else:
diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst
index 25be346..645c242 100644
--- a/docs/apache-airflow/faq.rst
+++ b/docs/apache-airflow/faq.rst
@@ -100,7 +100,7 @@ DAGs have configurations that improves efficiency:
 
 Operators or tasks also have configurations that improves efficiency and scheduling priority:
 
-- ``task_concurrency``: This parameter controls the number of concurrent running task instances across ``dag_runs``
+- ``max_active_tis_per_dag``: This parameter controls the number of concurrent running task instances across ``dag_runs``
   per task.
 - ``pool``: See :ref:`concepts:pool`.
 - ``priority_weight``: See :ref:`concepts:priority-weight`.
diff --git a/tests/conftest.py b/tests/conftest.py
index a212340..bcfc91f 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -612,7 +612,7 @@ def create_dummy_dag(dag_maker):
     def create_dag(
         dag_id='dag',
         task_id='op1',
-        task_concurrency=16,
+        max_active_tis_per_dag=16,
         pool='default_pool',
         executor_config={},
         trigger_rule='all_done',
@@ -626,7 +626,7 @@ def create_dummy_dag(dag_maker):
         with dag_maker(dag_id, **kwargs) as dag:
             op = DummyOperator(
                 task_id=task_id,
-                task_concurrency=task_concurrency,
+                max_active_tis_per_dag=max_active_tis_per_dag,
                 executor_config=executor_config,
                 on_success_callback=on_success_callback,
                 on_execute_callback=on_execute_callback,
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index cf3b5c5..ba9e0da 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -77,12 +77,12 @@ class TestBackfillJob:
         dag_maker_fixture,
         dag_id='test_dag',
         pool=Pool.DEFAULT_POOL_NAME,
-        task_concurrency=None,
+        max_active_tis_per_dag=None,
         task_id='op',
         **kwargs,
     ):
         with dag_maker_fixture(dag_id=dag_id, schedule_interval='@daily', **kwargs) as dag:
-            DummyOperator(task_id=task_id, pool=pool, task_concurrency=task_concurrency)
+            DummyOperator(task_id=task_id, pool=pool, max_active_tis_per_dag=max_active_tis_per_dag)
 
         return dag
 
@@ -295,12 +295,12 @@ class TestBackfillJob:
         assert conf_ == dr[0].conf
 
     @patch('airflow.jobs.backfill_job.BackfillJob.log')
-    def test_backfill_respect_task_concurrency_limit(self, mock_log, dag_maker):
-        task_concurrency = 2
+    def test_backfill_respect_max_active_tis_per_dag_limit(self, mock_log, dag_maker):
+        max_active_tis_per_dag = 2
         dag = self._get_dummy_dag(
             dag_maker,
-            dag_id='test_backfill_respect_task_concurrency_limit',
-            task_concurrency=task_concurrency,
+            dag_id='test_backfill_respect_max_active_tis_per_dag_limit',
+            max_active_tis_per_dag=max_active_tis_per_dag,
         )
         dag_maker.create_dagrun()
 
@@ -321,9 +321,9 @@ class TestBackfillJob:
 
         num_running_task_instances = 0
         for running_task_instances in executor.history:
-            assert len(running_task_instances) <= task_concurrency
+            assert len(running_task_instances) <= max_active_tis_per_dag
             num_running_task_instances += len(running_task_instances)
-            if len(running_task_instances) == task_concurrency:
+            if len(running_task_instances) == max_active_tis_per_dag:
                 task_concurrency_limit_reached_at_least_once = True
 
         assert 8 == num_running_task_instances
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index f8e8e53..8902240 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -692,12 +692,12 @@ class TestSchedulerJob:
         session.rollback()
 
     # TODO: This is a hack, I think I need to just remove the setting and have it on always
-    def test_find_executable_task_instances_task_concurrency(self, dag_maker):
-        dag_id = 'SchedulerJobTest.test_find_executable_task_instances_task_concurrency'
+    def test_find_executable_task_instances_max_active_tis_per_dag(self, dag_maker):
+        dag_id = 'SchedulerJobTest.test_find_executable_task_instances_max_active_tis_per_dag'
         task_id_1 = 'dummy'
         task_id_2 = 'dummy2'
         with dag_maker(dag_id=dag_id, max_active_tasks=16) as dag:
-            task1 = DummyOperator(task_id=task_id_1, task_concurrency=2)
+            task1 = DummyOperator(task_id=task_id_1, max_active_tis_per_dag=2)
             task2 = DummyOperator(task_id=task_id_2)
 
         executor = MockExecutor(do_update=True)
@@ -2911,15 +2911,15 @@ class TestSchedulerJob:
             ],
         ],
     )
-    def test_dag_file_processor_process_task_instances_with_task_concurrency(
+    def test_dag_file_processor_process_task_instances_with_max_active_tis_per_dag(
         self, state, start_date, end_date, dag_maker
     ):
         """
         Test if _process_task_instances puts the right task instances into the
         mock_list.
         """
-        with dag_maker(dag_id='test_scheduler_process_execute_task_with_task_concurrency'):
-            BashOperator(task_id='dummy', task_concurrency=2, bash_command='echo Hi')
+        with dag_maker(dag_id='test_scheduler_process_execute_task_with_max_active_tis_per_dag'):
+            BashOperator(task_id='dummy', max_active_tis_per_dag=2, bash_command='echo Hi')
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         self.scheduler_job.processor_agent = mock.MagicMock()
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 59f03f0..fb6a099 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -262,7 +262,7 @@ class TestDagRun(unittest.TestCase):
         dag = DAG('test_dagrun_no_deadlock', start_date=DEFAULT_DATE)
         with dag:
             DummyOperator(task_id='dop', depends_on_past=True)
-            DummyOperator(task_id='tc', task_concurrency=1)
+            DummyOperator(task_id='tc', max_active_tis_per_dag=1)
 
         dag.clear()
         dr = dag.create_dagrun(
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 35f2af3..ded8901 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -289,11 +289,11 @@ class TestTaskInstance:
         ti.run()
         assert ti.state == State.NONE
 
-    def test_requeue_over_task_concurrency(self, create_dummy_dag):
+    def test_requeue_over_max_active_tis_per_dag(self, create_dummy_dag):
         _, task = create_dummy_dag(
-            dag_id='test_requeue_over_task_concurrency',
-            task_id='test_requeue_over_task_concurrency_op',
-            task_concurrency=0,
+            dag_id='test_requeue_over_max_active_tis_per_dag',
+            task_id='test_requeue_over_max_active_tis_per_dag_op',
+            max_active_tis_per_dag=0,
             max_active_runs=1,
             max_active_tasks=2,
         )
@@ -310,7 +310,7 @@ class TestTaskInstance:
         _, task = create_dummy_dag(
             dag_id='test_requeue_over_pool_concurrency',
             task_id='test_requeue_over_pool_concurrency_op',
-            task_concurrency=0,
+            max_active_tis_per_dag=0,
             max_active_runs=1,
             max_active_tasks=2,
         )
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 7e2798c..826b10f 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -887,6 +887,7 @@ class TestStringifiedDAGs(unittest.TestCase):
             'executor_config': {},
             'inlets': [],
             'label': '10',
+            'max_active_tis_per_dag': None,
             'max_retry_delay': None,
             'on_execute_callback': None,
             'on_failure_callback': None,
@@ -907,7 +908,6 @@ class TestStringifiedDAGs(unittest.TestCase):
             'sla': None,
             'start_date': None,
             'subdag': None,
-            'task_concurrency': None,
             'task_id': '10',
             'trigger_rule': 'all_success',
             'wait_for_downstream': False,
diff --git a/tests/ti_deps/deps/test_task_concurrency.py b/tests/ti_deps/deps/test_task_concurrency.py
index e723c5a..fcc4067 100644
--- a/tests/ti_deps/deps/test_task_concurrency.py
+++ b/tests/ti_deps/deps/test_task_concurrency.py
@@ -38,14 +38,14 @@ class TestTaskConcurrencyDep(unittest.TestCase):
         assert TaskConcurrencyDep().is_met(ti=ti, dep_context=dep_context)
 
     def test_not_reached_concurrency(self):
-        task = self._get_task(start_date=datetime(2016, 1, 1), task_concurrency=1)
+        task = self._get_task(start_date=datetime(2016, 1, 1), max_active_tis_per_dag=1)
         dep_context = DepContext()
         ti = Mock(task=task, execution_date=datetime(2016, 1, 1))
         ti.get_num_running_task_instances = lambda x: 0
         assert TaskConcurrencyDep().is_met(ti=ti, dep_context=dep_context)
 
     def test_reached_concurrency(self):
-        task = self._get_task(start_date=datetime(2016, 1, 1), task_concurrency=2)
+        task = self._get_task(start_date=datetime(2016, 1, 1), max_active_tis_per_dag=2)
         dep_context = DepContext()
         ti = Mock(task=task, execution_date=datetime(2016, 1, 1))
         ti.get_num_running_task_instances = lambda x: 1