You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/07 17:19:31 UTC
[airflow] branch v1-10-test updated: Show Deprecation warning on
duplicate Task ids (#8728)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new 1ef5f13 Show Deprecation warning on duplicate Task ids (#8728)
1ef5f13 is described below
commit 1ef5f13377802eec91179d62630426b10b60f1ba
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Thu May 7 18:18:52 2020 +0100
Show Deprecation warning on duplicate Task ids (#8728)
---
airflow/models/baseoperator.py | 3 ++-
airflow/models/dag.py | 2 +-
tests/models/test_dag.py | 40 ++++++++++++++++++++++++++++++++++++++++
3 files changed, 43 insertions(+), 2 deletions(-)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 828fdb1..0f76770 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -546,7 +546,8 @@ class BaseOperator(LoggingMixin):
"The DAG assigned to {} can not be changed.".format(self))
elif self.task_id not in dag.task_dict:
dag.add_task(self)
-
+ elif self.task_id in dag.task_dict and dag.task_dict[self.task_id] is not self:
+ dag.add_task(self)
self._dag = dag
def has_dag(self):
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index c6b9171..4d7eef8 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1329,7 +1329,7 @@ class DAG(BaseDag, LoggingMixin):
elif task.end_date and self.end_date:
task.end_date = min(task.end_date, self.end_date)
- if task.task_id in self.task_dict:
+ if task.task_id in self.task_dict and self.task_dict[task.task_id] is not task:
# TODO: raise an error in Airflow 2.0
warnings.warn(
'The requested task could not be added to the DAG because a '
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 580a3c7..cdbe1ee 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -25,6 +25,7 @@ import re
import unittest
from tempfile import NamedTemporaryFile
+import pytest
from parameterized import parameterized
from tests.compat import mock
@@ -36,6 +37,7 @@ from airflow import models, settings
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowDagCycleException
from airflow.models import DAG, DagModel, TaskInstance as TI
+from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils import timezone
@@ -901,3 +903,41 @@ class DagTest(unittest.TestCase):
self.assertEqual(dag.normalized_schedule_interval, expected_n_schedule_interval)
self.assertEqual(dag.schedule_interval, schedule_interval)
+
+ def test_duplicate_task_ids_raise_warning_with_dag_context_manager(self):
+ """Verify tasks with Duplicate task_id show warning"""
+
+ deprecation_msg = "The requested task could not be added to the DAG because a task with " \
+ "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to " \
+ "overwrite a task will raise an exception."
+
+ with pytest.warns(PendingDeprecationWarning) as record:
+ with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
+ t1 = DummyOperator(task_id="t1")
+ t2 = BashOperator(task_id="t1", bash_command="sleep 1")
+ t1 >> t2
+
+ warning = record[0]
+ assert str(warning.message) == deprecation_msg
+ assert issubclass(PendingDeprecationWarning, warning.category)
+
+ self.assertEqual(dag.task_dict, {t1.task_id: t1})
+
+ def test_duplicate_task_ids_raise_warning(self):
+ """Verify tasks with Duplicate task_id show warning"""
+
+ deprecation_msg = "The requested task could not be added to the DAG because a task with " \
+ "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to " \
+ "overwrite a task will raise an exception."
+
+ with pytest.warns(PendingDeprecationWarning) as record:
+ dag = DAG("test_dag", start_date=DEFAULT_DATE)
+ t1 = DummyOperator(task_id="t1", dag=dag)
+ t2 = BashOperator(task_id="t1", bash_command="sleep 1", dag=dag)
+ t1 >> t2
+
+ warning = record[0]
+ assert str(warning.message) == deprecation_msg
+ assert issubclass(PendingDeprecationWarning, warning.category)
+
+ self.assertEqual(dag.task_dict, {t1.task_id: t1})