You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/07 17:19:31 UTC

[airflow] branch v1-10-test updated: Show Deprecation warning on duplicate Task ids (#8728)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 1ef5f13  Show Deprecation warning on duplicate Task ids (#8728)
1ef5f13 is described below

commit 1ef5f13377802eec91179d62630426b10b60f1ba
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Thu May 7 18:18:52 2020 +0100

    Show Deprecation warning on duplicate Task ids (#8728)
---
 airflow/models/baseoperator.py |  3 ++-
 airflow/models/dag.py          |  2 +-
 tests/models/test_dag.py       | 40 ++++++++++++++++++++++++++++++++++++++++
 3 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 828fdb1..0f76770 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -546,7 +546,8 @@ class BaseOperator(LoggingMixin):
                 "The DAG assigned to {} can not be changed.".format(self))
         elif self.task_id not in dag.task_dict:
             dag.add_task(self)
-
+        elif self.task_id in dag.task_dict and dag.task_dict[self.task_id] is not self:
+            dag.add_task(self)
         self._dag = dag
 
     def has_dag(self):
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index c6b9171..4d7eef8 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1329,7 +1329,7 @@ class DAG(BaseDag, LoggingMixin):
         elif task.end_date and self.end_date:
             task.end_date = min(task.end_date, self.end_date)
 
-        if task.task_id in self.task_dict:
+        if task.task_id in self.task_dict and self.task_dict[task.task_id] is not task:
             # TODO: raise an error in Airflow 2.0
             warnings.warn(
                 'The requested task could not be added to the DAG because a '
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 580a3c7..cdbe1ee 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -25,6 +25,7 @@ import re
 import unittest
 from tempfile import NamedTemporaryFile
 
+import pytest
 from parameterized import parameterized
 from tests.compat import mock
 
@@ -36,6 +37,7 @@ from airflow import models, settings
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, AirflowDagCycleException
 from airflow.models import DAG, DagModel, TaskInstance as TI
+from airflow.operators.bash_operator import BashOperator
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.subdag_operator import SubDagOperator
 from airflow.utils import timezone
@@ -901,3 +903,41 @@ class DagTest(unittest.TestCase):
 
         self.assertEqual(dag.normalized_schedule_interval, expected_n_schedule_interval)
         self.assertEqual(dag.schedule_interval, schedule_interval)
+
+    def test_duplicate_task_ids_raise_warning_with_dag_context_manager(self):
+        """Verify tasks with Duplicate task_id show warning"""
+
+        deprecation_msg = "The requested task could not be added to the DAG because a task with " \
+                          "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to " \
+                          "overwrite a task will raise an exception."
+
+        with pytest.warns(PendingDeprecationWarning) as record:
+            with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
+                t1 = DummyOperator(task_id="t1")
+                t2 = BashOperator(task_id="t1", bash_command="sleep 1")
+                t1 >> t2
+
+            warning = record[0]
+            assert str(warning.message) == deprecation_msg
+            assert issubclass(PendingDeprecationWarning, warning.category)
+
+            self.assertEqual(dag.task_dict, {t1.task_id: t1})
+
+    def test_duplicate_task_ids_raise_warning(self):
+        """Verify tasks with Duplicate task_id show warning"""
+
+        deprecation_msg = "The requested task could not be added to the DAG because a task with " \
+                          "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to " \
+                          "overwrite a task will raise an exception."
+
+        with pytest.warns(PendingDeprecationWarning) as record:
+            dag = DAG("test_dag", start_date=DEFAULT_DATE)
+            t1 = DummyOperator(task_id="t1", dag=dag)
+            t2 = BashOperator(task_id="t1", bash_command="sleep 1", dag=dag)
+            t1 >> t2
+
+            warning = record[0]
+            assert str(warning.message) == deprecation_msg
+            assert issubclass(PendingDeprecationWarning, warning.category)
+
+            self.assertEqual(dag.task_dict, {t1.task_id: t1})