You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/08/19 15:05:18 UTC
[airflow] branch main updated: Rename ``task_concurrency`` to
``max_active_tis_per_dag`` (#17708)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 35a6c30 Rename ``task_concurrency`` to ``max_active_tis_per_dag`` (#17708)
35a6c30 is described below
commit 35a6c302772bf5d0bb74646b2da04856c7d7aaac
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Thu Aug 19 16:05:02 2021 +0100
Rename ``task_concurrency`` to ``max_active_tis_per_dag`` (#17708)
Follow-up of https://github.com/apache/airflow/pull/16267
Renames `task_concurrency` to `max_active_tis_per_dag`
Some of Airflow's concurrency settings have been a source of confusion for a lot of users (including me), for example:
https://stackoverflow.com/questions/56370720/how-to-control-the-parallelism-or-concurrency-of-an-airflow-installation
https://stackoverflow.com/questions/38200666/airflow-parallelism
This PR is an attempt to make the settings easier to understand
---
UPDATING.md | 22 ++++++++++++++++++++++
airflow/jobs/backfill_job.py | 4 ++--
airflow/jobs/scheduler_job.py | 2 +-
airflow/models/baseoperator.py | 17 +++++++++++++----
airflow/models/dag.py | 2 +-
airflow/models/dagrun.py | 2 +-
airflow/ti_deps/deps/task_concurrency_dep.py | 4 ++--
docs/apache-airflow/faq.rst | 2 +-
tests/conftest.py | 4 ++--
tests/jobs/test_backfill_job.py | 16 ++++++++--------
tests/jobs/test_scheduler_job.py | 12 ++++++------
tests/models/test_dagrun.py | 2 +-
tests/models/test_taskinstance.py | 10 +++++-----
tests/serialization/test_dag_serialization.py | 2 +-
tests/ti_deps/deps/test_task_concurrency.py | 4 ++--
15 files changed, 68 insertions(+), 37 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index 361d12f..b5dd047 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -148,6 +148,28 @@ dag = DAG(
If you are using DAGs Details API endpoint, use `max_active_tasks` instead of `concurrency`.
+### Task concurrency parameter has been renamed
+
+`BaseOperator.task_concurrency` has been deprecated and renamed to `max_active_tis_per_dag` for
+better understanding.
+
+This parameter controls the number of concurrent running task instances across ``dag_runs``
+per task.
+
+**Before**:
+
+```python
+with DAG(dag_id="task_concurrency_example"):
+ BashOperator(task_id="t1", task_concurrency=2, bash_command="echo Hi")
+```
+
+**After**:
+
+```python
+with DAG(dag_id="task_concurrency_example"):
+ BashOperator(task_id="t1", max_active_tis_per_dag=2, bash_command="echo Hi")
+```
+
### Marking success/failed automatically clears failed downstream tasks
When marking a task success/failed in Graph View, its downstream tasks that are in failed/upstream_failed state are automatically cleared.
diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index 281eaeb..8e120a9 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -587,14 +587,14 @@ class BackfillJob(BaseJob):
"Not scheduling since DAG max_active_tasks limit is reached."
)
- if task.task_concurrency:
+ if task.max_active_tis_per_dag:
num_running_task_instances_in_task = DAG.get_num_task_instances(
dag_id=self.dag_id,
task_ids=[task.task_id],
states=self.STATES_COUNT_AS_RUNNING,
)
- if num_running_task_instances_in_task >= task.task_concurrency:
+ if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
raise TaskConcurrencyLimitReached(
"Not scheduling since Task concurrency limit is reached."
)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index e0845c6..54e8259 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -430,7 +430,7 @@ class SchedulerJob(BaseJob):
if serialized_dag.has_task(task_instance.task_id):
task_concurrency_limit = serialized_dag.get_task(
task_instance.task_id
- ).task_concurrency
+ ).max_active_tis_per_dag
if task_concurrency_limit is not None:
current_task_concurrency = task_concurrency_map[
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 7273e3f..78a0fc9 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -360,9 +360,9 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
:type resources: dict
:param run_as_user: unix username to impersonate while running the task
:type run_as_user: str
- :param task_concurrency: When set, a task will be able to limit the concurrent
- runs across execution_dates
- :type task_concurrency: int
+ :param max_active_tis_per_dag: When set, a task will be able to limit the concurrent
+ runs across execution_dates.
+ :type max_active_tis_per_dag: int
:param executor_config: Additional task-level configuration parameters that are
interpreted by a specific executor. Parameters are namespaced by the name of
executor.
@@ -492,6 +492,7 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
resources: Optional[Dict] = None,
run_as_user: Optional[str] = None,
task_concurrency: Optional[int] = None,
+ max_active_tis_per_dag: Optional[int] = None,
executor_config: Optional[Dict] = None,
do_xcom_push: bool = True,
inlets: Optional[Any] = None,
@@ -624,7 +625,15 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
self.weight_rule = weight_rule
self.resources: Optional[Resources] = Resources(**resources) if resources else None
self.run_as_user = run_as_user
- self.task_concurrency = task_concurrency
+ if task_concurrency and not max_active_tis_per_dag:
+ # TODO: Remove in Airflow 3.0
+ warnings.warn(
+ "The 'task_concurrency' parameter is deprecated. Please use 'max_active_tis_per_dag'.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ max_active_tis_per_dag = task_concurrency
+ self.max_active_tis_per_dag = max_active_tis_per_dag
self.executor_config = executor_config or {}
self.do_xcom_push = do_xcom_push
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index d412fb1..d480906 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2233,7 +2233,7 @@ class DAG(LoggingMixin):
orm_dag.description = dag.description
orm_dag.schedule_interval = dag.schedule_interval
orm_dag.max_active_tasks = dag.max_active_tasks
- orm_dag.has_task_concurrency_limits = any(t.task_concurrency is not None for t in dag.tasks)
+ orm_dag.has_task_concurrency_limits = any(t.max_active_tis_per_dag is not None for t in dag.tasks)
orm_dag.calculate_dagrun_date_fields(
dag,
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 098fd5b..cdc036e 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -412,7 +412,7 @@ class DagRun(Base, LoggingMixin):
unfinished_tasks = info.unfinished_tasks
none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks)
- none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks)
+ none_task_concurrency = all(t.task.max_active_tis_per_dag is None for t in unfinished_tasks)
none_deferred = all(t.state != State.DEFERRED for t in unfinished_tasks)
if unfinished_tasks and none_depends_on_past and none_task_concurrency and none_deferred:
diff --git a/airflow/ti_deps/deps/task_concurrency_dep.py b/airflow/ti_deps/deps/task_concurrency_dep.py
index 6334511..264edb8 100644
--- a/airflow/ti_deps/deps/task_concurrency_dep.py
+++ b/airflow/ti_deps/deps/task_concurrency_dep.py
@@ -29,11 +29,11 @@ class TaskConcurrencyDep(BaseTIDep):
@provide_session
def _get_dep_statuses(self, ti, session, dep_context):
- if ti.task.task_concurrency is None:
+ if ti.task.max_active_tis_per_dag is None:
yield self._passing_status(reason="Task concurrency is not set.")
return
- if ti.get_num_running_task_instances(session) >= ti.task.task_concurrency:
+ if ti.get_num_running_task_instances(session) >= ti.task.max_active_tis_per_dag:
yield self._failing_status(reason="The max task concurrency has been reached.")
return
else:
diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst
index 25be346..645c242 100644
--- a/docs/apache-airflow/faq.rst
+++ b/docs/apache-airflow/faq.rst
@@ -100,7 +100,7 @@ DAGs have configurations that improves efficiency:
Operators or tasks also have configurations that improves efficiency and scheduling priority:
-- ``task_concurrency``: This parameter controls the number of concurrent running task instances across ``dag_runs``
+- ``max_active_tis_per_dag``: This parameter controls the number of concurrent running task instances across ``dag_runs``
per task.
- ``pool``: See :ref:`concepts:pool`.
- ``priority_weight``: See :ref:`concepts:priority-weight`.
diff --git a/tests/conftest.py b/tests/conftest.py
index a212340..bcfc91f 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -612,7 +612,7 @@ def create_dummy_dag(dag_maker):
def create_dag(
dag_id='dag',
task_id='op1',
- task_concurrency=16,
+ max_active_tis_per_dag=16,
pool='default_pool',
executor_config={},
trigger_rule='all_done',
@@ -626,7 +626,7 @@ def create_dummy_dag(dag_maker):
with dag_maker(dag_id, **kwargs) as dag:
op = DummyOperator(
task_id=task_id,
- task_concurrency=task_concurrency,
+ max_active_tis_per_dag=max_active_tis_per_dag,
executor_config=executor_config,
on_success_callback=on_success_callback,
on_execute_callback=on_execute_callback,
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index cf3b5c5..ba9e0da 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -77,12 +77,12 @@ class TestBackfillJob:
dag_maker_fixture,
dag_id='test_dag',
pool=Pool.DEFAULT_POOL_NAME,
- task_concurrency=None,
+ max_active_tis_per_dag=None,
task_id='op',
**kwargs,
):
with dag_maker_fixture(dag_id=dag_id, schedule_interval='@daily', **kwargs) as dag:
- DummyOperator(task_id=task_id, pool=pool, task_concurrency=task_concurrency)
+ DummyOperator(task_id=task_id, pool=pool, max_active_tis_per_dag=max_active_tis_per_dag)
return dag
@@ -295,12 +295,12 @@ class TestBackfillJob:
assert conf_ == dr[0].conf
@patch('airflow.jobs.backfill_job.BackfillJob.log')
- def test_backfill_respect_task_concurrency_limit(self, mock_log, dag_maker):
- task_concurrency = 2
+ def test_backfill_respect_max_active_tis_per_dag_limit(self, mock_log, dag_maker):
+ max_active_tis_per_dag = 2
dag = self._get_dummy_dag(
dag_maker,
- dag_id='test_backfill_respect_task_concurrency_limit',
- task_concurrency=task_concurrency,
+ dag_id='test_backfill_respect_max_active_tis_per_dag_limit',
+ max_active_tis_per_dag=max_active_tis_per_dag,
)
dag_maker.create_dagrun()
@@ -321,9 +321,9 @@ class TestBackfillJob:
num_running_task_instances = 0
for running_task_instances in executor.history:
- assert len(running_task_instances) <= task_concurrency
+ assert len(running_task_instances) <= max_active_tis_per_dag
num_running_task_instances += len(running_task_instances)
- if len(running_task_instances) == task_concurrency:
+ if len(running_task_instances) == max_active_tis_per_dag:
task_concurrency_limit_reached_at_least_once = True
assert 8 == num_running_task_instances
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index f8e8e53..8902240 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -692,12 +692,12 @@ class TestSchedulerJob:
session.rollback()
# TODO: This is a hack, I think I need to just remove the setting and have it on always
- def test_find_executable_task_instances_task_concurrency(self, dag_maker):
- dag_id = 'SchedulerJobTest.test_find_executable_task_instances_task_concurrency'
+ def test_find_executable_task_instances_max_active_tis_per_dag(self, dag_maker):
+ dag_id = 'SchedulerJobTest.test_find_executable_task_instances_max_active_tis_per_dag'
task_id_1 = 'dummy'
task_id_2 = 'dummy2'
with dag_maker(dag_id=dag_id, max_active_tasks=16) as dag:
- task1 = DummyOperator(task_id=task_id_1, task_concurrency=2)
+ task1 = DummyOperator(task_id=task_id_1, max_active_tis_per_dag=2)
task2 = DummyOperator(task_id=task_id_2)
executor = MockExecutor(do_update=True)
@@ -2911,15 +2911,15 @@ class TestSchedulerJob:
],
],
)
- def test_dag_file_processor_process_task_instances_with_task_concurrency(
+ def test_dag_file_processor_process_task_instances_with_max_active_tis_per_dag(
self, state, start_date, end_date, dag_maker
):
"""
Test if _process_task_instances puts the right task instances into the
mock_list.
"""
- with dag_maker(dag_id='test_scheduler_process_execute_task_with_task_concurrency'):
- BashOperator(task_id='dummy', task_concurrency=2, bash_command='echo Hi')
+ with dag_maker(dag_id='test_scheduler_process_execute_task_with_max_active_tis_per_dag'):
+ BashOperator(task_id='dummy', max_active_tis_per_dag=2, bash_command='echo Hi')
self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.processor_agent = mock.MagicMock()
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 59f03f0..fb6a099 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -262,7 +262,7 @@ class TestDagRun(unittest.TestCase):
dag = DAG('test_dagrun_no_deadlock', start_date=DEFAULT_DATE)
with dag:
DummyOperator(task_id='dop', depends_on_past=True)
- DummyOperator(task_id='tc', task_concurrency=1)
+ DummyOperator(task_id='tc', max_active_tis_per_dag=1)
dag.clear()
dr = dag.create_dagrun(
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 35f2af3..ded8901 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -289,11 +289,11 @@ class TestTaskInstance:
ti.run()
assert ti.state == State.NONE
- def test_requeue_over_task_concurrency(self, create_dummy_dag):
+ def test_requeue_over_max_active_tis_per_dag(self, create_dummy_dag):
_, task = create_dummy_dag(
- dag_id='test_requeue_over_task_concurrency',
- task_id='test_requeue_over_task_concurrency_op',
- task_concurrency=0,
+ dag_id='test_requeue_over_max_active_tis_per_dag',
+ task_id='test_requeue_over_max_active_tis_per_dag_op',
+ max_active_tis_per_dag=0,
max_active_runs=1,
max_active_tasks=2,
)
@@ -310,7 +310,7 @@ class TestTaskInstance:
_, task = create_dummy_dag(
dag_id='test_requeue_over_pool_concurrency',
task_id='test_requeue_over_pool_concurrency_op',
- task_concurrency=0,
+ max_active_tis_per_dag=0,
max_active_runs=1,
max_active_tasks=2,
)
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 7e2798c..826b10f 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -887,6 +887,7 @@ class TestStringifiedDAGs(unittest.TestCase):
'executor_config': {},
'inlets': [],
'label': '10',
+ 'max_active_tis_per_dag': None,
'max_retry_delay': None,
'on_execute_callback': None,
'on_failure_callback': None,
@@ -907,7 +908,6 @@ class TestStringifiedDAGs(unittest.TestCase):
'sla': None,
'start_date': None,
'subdag': None,
- 'task_concurrency': None,
'task_id': '10',
'trigger_rule': 'all_success',
'wait_for_downstream': False,
diff --git a/tests/ti_deps/deps/test_task_concurrency.py b/tests/ti_deps/deps/test_task_concurrency.py
index e723c5a..fcc4067 100644
--- a/tests/ti_deps/deps/test_task_concurrency.py
+++ b/tests/ti_deps/deps/test_task_concurrency.py
@@ -38,14 +38,14 @@ class TestTaskConcurrencyDep(unittest.TestCase):
assert TaskConcurrencyDep().is_met(ti=ti, dep_context=dep_context)
def test_not_reached_concurrency(self):
- task = self._get_task(start_date=datetime(2016, 1, 1), task_concurrency=1)
+ task = self._get_task(start_date=datetime(2016, 1, 1), max_active_tis_per_dag=1)
dep_context = DepContext()
ti = Mock(task=task, execution_date=datetime(2016, 1, 1))
ti.get_num_running_task_instances = lambda x: 0
assert TaskConcurrencyDep().is_met(ti=ti, dep_context=dep_context)
def test_reached_concurrency(self):
- task = self._get_task(start_date=datetime(2016, 1, 1), task_concurrency=2)
+ task = self._get_task(start_date=datetime(2016, 1, 1), max_active_tis_per_dag=2)
dep_context = DepContext()
ti = Mock(task=task, execution_date=datetime(2016, 1, 1))
ti.get_num_running_task_instances = lambda x: 1