You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/06 17:01:54 UTC

[GitHub] [airflow] kaxil commented on a change in pull request #8728: Show Deprecation warning on duplicate Task ids

kaxil commented on a change in pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#discussion_r420948075



##########
File path: tests/models/test_dag.py
##########
@@ -901,3 +902,61 @@ def test_normalized_schedule_interval(
 
         self.assertEqual(dag.normalized_schedule_interval, expected_n_schedule_interval)
         self.assertEqual(dag.schedule_interval, schedule_interval)
+
+    def test_duplicate_task_ids_not_allowed_with_dag_context_manager(self):
+        """Verify tasks with Duplicate task_id show warning"""
+        with self.assertWarnsRegex(
+            PendingDeprecationWarning,
+            "The requested task could not be added to the DAG because a task with "
+            "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to "
+            "overwrite a task will raise an exception."
+        ):
+            with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
+                t1 = DummyOperator(task_id="t1")
+                t2 = BashOperator(task_id="t1", bash_command="sleep 1")
+                t1 >> t2
+
+        self.assertEqual(dag.task_dict, {t1.task_id: t1})
+
+        # Also verify that DAGs with duplicate task_ids don't raise errors
+        with DAG("test_dag_1", start_date=DEFAULT_DATE) as dag1:
+            t3 = DummyOperator(task_id="t3")
+            t4 = DummyOperator(task_id="t3")
+            t3 >> t4
+
+        self.assertEqual(dag1.task_dict, {t3.task_id: t3, t4.task_id: t4})
+
+    def test_duplicate_task_ids_not_allowed_without_dag_context_manager(self):
+        """Verify tasks with Duplicate task_id show warning"""
+        with self.assertWarnsRegex(
+            PendingDeprecationWarning,
+            "The requested task could not be added to the DAG because a task with "
+            "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to "
+            "overwrite a task will raise an exception."
+        ):
+            dag = DAG("test_dag", start_date=DEFAULT_DATE)
+            t1 = DummyOperator(task_id="t1", dag=dag)
+            t2 = BashOperator(task_id="t1", bash_command="sleep 1", dag=dag)
+            t1 >> t2
+
+        self.assertEqual(dag.task_dict, {t1.task_id: t1})
+
+        # Also verify that DAGs with duplicate task_ids don't raise errors
+        dag1 = DAG("test_dag_1", start_date=DEFAULT_DATE)
+        t3 = DummyOperator(task_id="t3", dag=dag1)
+        t4 = DummyOperator(task_id="t4", dag=dag1)

Review comment:
       Removed, I can't seem to remember why I add that test




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org