You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/05 23:04:45 UTC

[GitHub] [airflow] kaxil opened a new pull request #8728: Show Deprecation warning on duplicate Task ids

kaxil opened a new pull request #8728:
URL: https://github.com/apache/airflow/pull/8728


   We already raise the warning at https://github.com/apache/airflow/blob/1.10.10/airflow/models/dag.py#L1318-L1328 but that wasn't enough.
   
   **Before**:
   
   ```
   airflow ❯ python -c "import airflow; from airflow.operators.dummy_operator import DummyOperator; dag = airflow.DAG(dag_id='test', start_date=airflow.utils.timezone.utcnow()); t1 = DummyOperator(dag=dag, task_id='t1'); t1_2 = DummyOperator(dag=dag, task_id='t1')"
   ```
   
   **After**:
   
   ```
   airflow ❯ python -c "import airflow; from airflow.operators.dummy_operator import DummyOperator; dag = airflow.DAG(dag_id='test', start_date=airflow.utils.timezone.utcnow()); t1 = DummyOperator(dag=dag, task_id='t1'); t1_2 = DummyOperator(dag=dag, task_id='t1')"
   /opt/airflow/airflow/models/baseoperator.py:555: PendingDeprecationWarning: The requested task could not be added to the DAG because a task with task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to overwrite a task will raise an exception.
     category=PendingDeprecationWarning)
   ```
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] JeffryMAC edited a comment on pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
JeffryMAC edited a comment on pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#issuecomment-705734824


   @kaxil I use context manager and `as dag` for every DAG I have.
   the "as dag" is not the dag_id. why it matters?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#issuecomment-705668337


   > @kaxil this is causing issue for me if i have same taskid in 2 different dags
   
   It should not show you warning for different dags, unless you are using context manager and have ` as dag:` for both of your DAGs


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#issuecomment-705668337






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#issuecomment-625528936


   Only cos it's related to this PR, not that it's a bug with this PR
   
   And I think I've just found a problem in our example dags caused by us using `!=` in master:
   
   https://github.com/apache/airflow/blob/6e4f5fa66ebe2d8252829c67e79f895fa5029b5a/airflow/providers/google/cloud/example_dags/example_gcs.py#L129-L130


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] JeffryMAC commented on pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
JeffryMAC commented on pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#issuecomment-705734824


   @kaxil I use context manager and as dag for every DAG I have.
   the "as dag" is not the dag_id. why it matters?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#issuecomment-624935065


   CI failures are unrelated. @potiuk any idea?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] JeffryMAC edited a comment on pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
JeffryMAC edited a comment on pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#issuecomment-705734824


   @kaxil I use context manager and `as dag` for every DAG I have.
   the "as dag" is not the dag_id. why it matters?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tooptoop4 commented on pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
tooptoop4 commented on pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#issuecomment-705669201


   @kaxil raised https://github.com/apache/airflow/issues/11354


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#discussion_r420949756



##########
File path: tests/models/test_dag.py
##########
@@ -901,3 +902,61 @@ def test_normalized_schedule_interval(
 
         self.assertEqual(dag.normalized_schedule_interval, expected_n_schedule_interval)
         self.assertEqual(dag.schedule_interval, schedule_interval)
+
+    def test_duplicate_task_ids_not_allowed_with_dag_context_manager(self):
+        """Verify tasks with Duplicate task_id show warning"""
+        with self.assertWarnsRegex(
+            PendingDeprecationWarning,
+            "The requested task could not be added to the DAG because a task with "
+            "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to "
+            "overwrite a task will raise an exception."
+        ):
+            with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
+                t1 = DummyOperator(task_id="t1")
+                t2 = BashOperator(task_id="t1", bash_command="sleep 1")
+                t1 >> t2
+
+        self.assertEqual(dag.task_dict, {t1.task_id: t1})
+
+        # Also verify that DAGs with duplicate task_ids don't raise errors
+        with DAG("test_dag_1", start_date=DEFAULT_DATE) as dag1:
+            t3 = DummyOperator(task_id="t3")
+            t4 = DummyOperator(task_id="t3")
+            t3 >> t4
+
+        self.assertEqual(dag1.task_dict, {t3.task_id: t3, t4.task_id: t4})

Review comment:
       Yea removed, thanks for catching that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#issuecomment-705738620


   > @kaxil I use context manager and `as dag` for every DAG I have.
   > the "as dag" is not the dag_id. why it matters?
   
   You are right, in theory it should not, although I roughly remember I had to change:
   
   ```
   with DAG(dag_id='d1') as dag:
        t1 = BashOperator(...)
   
   with DAG(dag_id='d2') as dag:
        t1 = BashOperator(...)
   ```
   
   to
   
   ```
   with DAG(dag_id='d1') as dag_1:
        t1 = BashOperator(...)
   
   with DAG(dag_id='d2') as dag_2:
        t1 = BashOperator(...)
   ```
   
   Can't remember why and when though
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins commented on a change in pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
jhtimmins commented on a change in pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#discussion_r420490603



##########
File path: tests/models/test_dag.py
##########
@@ -901,3 +902,61 @@ def test_normalized_schedule_interval(
 
         self.assertEqual(dag.normalized_schedule_interval, expected_n_schedule_interval)
         self.assertEqual(dag.schedule_interval, schedule_interval)
+
+    def test_duplicate_task_ids_not_allowed_with_dag_context_manager(self):
+        """Verify tasks with Duplicate task_id show warning"""
+        with self.assertWarnsRegex(
+            PendingDeprecationWarning,
+            "The requested task could not be added to the DAG because a task with "
+            "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to "
+            "overwrite a task will raise an exception."
+        ):
+            with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
+                t1 = DummyOperator(task_id="t1")
+                t2 = BashOperator(task_id="t1", bash_command="sleep 1")
+                t1 >> t2
+
+        self.assertEqual(dag.task_dict, {t1.task_id: t1})
+
+        # Also verify that DAGs with duplicate task_ids don't raise errors
+        with DAG("test_dag_1", start_date=DEFAULT_DATE) as dag1:
+            t3 = DummyOperator(task_id="t3")
+            t4 = DummyOperator(task_id="t3")
+            t3 >> t4
+
+        self.assertEqual(dag1.task_dict, {t3.task_id: t3, t4.task_id: t4})

Review comment:
       I don't quite follow why the preceding two tests are different? Can they only use the same task_id if they use the same operator type?
   
   Might be worth clarifying the difference somewhere.

##########
File path: tests/models/test_dag.py
##########
@@ -901,3 +902,61 @@ def test_normalized_schedule_interval(
 
         self.assertEqual(dag.normalized_schedule_interval, expected_n_schedule_interval)
         self.assertEqual(dag.schedule_interval, schedule_interval)
+
+    def test_duplicate_task_ids_not_allowed_with_dag_context_manager(self):
+        """Verify tasks with Duplicate task_id show warning"""
+        with self.assertWarnsRegex(
+            PendingDeprecationWarning,
+            "The requested task could not be added to the DAG because a task with "
+            "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to "
+            "overwrite a task will raise an exception."

Review comment:
       Testing for the exact response message is likely to make this case brittle. Here are some possible alternatives:
   
   1. Create a class constant on Dag that stores this string, and use it both in the implementation and the test.
   2. Same as 1, but instead put it in a `warnings.py` file or similar. 
   3. Same as 1, but add some type of Warning or Alert mixin.
   4. Create a child class of `PendingDeprecationWarning` that encompasses the message body, then just text for the Exception type.
   5. Only modify the test and check for the presence of `t1` and `2.0`.
   
   4 probably makes the most sense. Can add 5 to it for extra safety that string interpolation is working.

##########
File path: tests/models/test_dag.py
##########
@@ -901,3 +902,61 @@ def test_normalized_schedule_interval(
 
         self.assertEqual(dag.normalized_schedule_interval, expected_n_schedule_interval)
         self.assertEqual(dag.schedule_interval, schedule_interval)
+
+    def test_duplicate_task_ids_not_allowed_with_dag_context_manager(self):
+        """Verify tasks with Duplicate task_id show warning"""
+        with self.assertWarnsRegex(
+            PendingDeprecationWarning,
+            "The requested task could not be added to the DAG because a task with "
+            "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to "
+            "overwrite a task will raise an exception."
+        ):
+            with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
+                t1 = DummyOperator(task_id="t1")
+                t2 = BashOperator(task_id="t1", bash_command="sleep 1")
+                t1 >> t2
+
+        self.assertEqual(dag.task_dict, {t1.task_id: t1})
+
+        # Also verify that DAGs with duplicate task_ids don't raise errors
+        with DAG("test_dag_1", start_date=DEFAULT_DATE) as dag1:
+            t3 = DummyOperator(task_id="t3")
+            t4 = DummyOperator(task_id="t3")
+            t3 >> t4
+
+        self.assertEqual(dag1.task_dict, {t3.task_id: t3, t4.task_id: t4})
+
+    def test_duplicate_task_ids_not_allowed_without_dag_context_manager(self):

Review comment:
       This name is a bit ambiguous. Implies that duplicate task ids aren't allowed _unless_ there's a context manager.

##########
File path: tests/models/test_dag.py
##########
@@ -901,3 +902,61 @@ def test_normalized_schedule_interval(
 
         self.assertEqual(dag.normalized_schedule_interval, expected_n_schedule_interval)
         self.assertEqual(dag.schedule_interval, schedule_interval)
+
+    def test_duplicate_task_ids_not_allowed_with_dag_context_manager(self):
+        """Verify tasks with Duplicate task_id show warning"""
+        with self.assertWarnsRegex(
+            PendingDeprecationWarning,
+            "The requested task could not be added to the DAG because a task with "
+            "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to "
+            "overwrite a task will raise an exception."
+        ):
+            with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
+                t1 = DummyOperator(task_id="t1")
+                t2 = BashOperator(task_id="t1", bash_command="sleep 1")
+                t1 >> t2
+
+        self.assertEqual(dag.task_dict, {t1.task_id: t1})
+
+        # Also verify that DAGs with duplicate task_ids don't raise errors
+        with DAG("test_dag_1", start_date=DEFAULT_DATE) as dag1:
+            t3 = DummyOperator(task_id="t3")
+            t4 = DummyOperator(task_id="t3")
+            t3 >> t4
+
+        self.assertEqual(dag1.task_dict, {t3.task_id: t3, t4.task_id: t4})
+
+    def test_duplicate_task_ids_not_allowed_without_dag_context_manager(self):
+        """Verify tasks with Duplicate task_id show warning"""
+        with self.assertWarnsRegex(
+            PendingDeprecationWarning,
+            "The requested task could not be added to the DAG because a task with "
+            "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to "
+            "overwrite a task will raise an exception."
+        ):
+            dag = DAG("test_dag", start_date=DEFAULT_DATE)
+            t1 = DummyOperator(task_id="t1", dag=dag)
+            t2 = BashOperator(task_id="t1", bash_command="sleep 1", dag=dag)
+            t1 >> t2
+
+        self.assertEqual(dag.task_dict, {t1.task_id: t1})
+
+        # Also verify that DAGs with duplicate task_ids don't raise errors
+        dag1 = DAG("test_dag_1", start_date=DEFAULT_DATE)
+        t3 = DummyOperator(task_id="t3", dag=dag1)
+        t4 = DummyOperator(task_id="t4", dag=dag1)

Review comment:
       These task_ids aren't duplicates, tho the comment above suggests they're supposed to be. Possible bug?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tooptoop4 commented on pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
tooptoop4 commented on pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#issuecomment-705666361






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#discussion_r420949537



##########
File path: tests/models/test_dag.py
##########
@@ -901,3 +902,61 @@ def test_normalized_schedule_interval(
 
         self.assertEqual(dag.normalized_schedule_interval, expected_n_schedule_interval)
         self.assertEqual(dag.schedule_interval, schedule_interval)
+
+    def test_duplicate_task_ids_not_allowed_with_dag_context_manager(self):
+        """Verify tasks with Duplicate task_id show warning"""
+        with self.assertWarnsRegex(
+            PendingDeprecationWarning,
+            "The requested task could not be added to the DAG because a task with "
+            "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to "
+            "overwrite a task will raise an exception."
+        ):
+            with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
+                t1 = DummyOperator(task_id="t1")
+                t2 = BashOperator(task_id="t1", bash_command="sleep 1")
+                t1 >> t2
+
+        self.assertEqual(dag.task_dict, {t1.task_id: t1})
+
+        # Also verify that DAGs with duplicate task_ids don't raise errors
+        with DAG("test_dag_1", start_date=DEFAULT_DATE) as dag1:
+            t3 = DummyOperator(task_id="t3")
+            t4 = DummyOperator(task_id="t3")
+            t3 >> t4
+
+        self.assertEqual(dag1.task_dict, {t3.task_id: t3, t4.task_id: t4})
+
+    def test_duplicate_task_ids_not_allowed_without_dag_context_manager(self):

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tooptoop4 commented on pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
tooptoop4 commented on pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#issuecomment-705666361


   @kaxil  this is causing issue for me if i have same taskid in 2 different dags


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#issuecomment-625054042


   Will be fixed by #8758 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#discussion_r421455457



##########
File path: tests/models/test_dag.py
##########
@@ -901,3 +902,61 @@ def test_normalized_schedule_interval(
 
         self.assertEqual(dag.normalized_schedule_interval, expected_n_schedule_interval)
         self.assertEqual(dag.schedule_interval, schedule_interval)
+
+    def test_duplicate_task_ids_not_allowed_with_dag_context_manager(self):
+        """Verify tasks with Duplicate task_id show warning"""
+        with self.assertWarnsRegex(
+            PendingDeprecationWarning,
+            "The requested task could not be added to the DAG because a task with "
+            "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to "
+            "overwrite a task will raise an exception."

Review comment:
       The main reason we test for the exact same message is otherwise, even if someone changes the message in warnings.py the test would still pass.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] JeffryMAC commented on pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
JeffryMAC commented on pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#issuecomment-705734824


   @kaxil I use context manager and as dag for every DAG I have.
   the "as dag" is not the dag_id. why it matters?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#discussion_r420948075



##########
File path: tests/models/test_dag.py
##########
@@ -901,3 +902,61 @@ def test_normalized_schedule_interval(
 
         self.assertEqual(dag.normalized_schedule_interval, expected_n_schedule_interval)
         self.assertEqual(dag.schedule_interval, schedule_interval)
+
+    def test_duplicate_task_ids_not_allowed_with_dag_context_manager(self):
+        """Verify tasks with Duplicate task_id show warning"""
+        with self.assertWarnsRegex(
+            PendingDeprecationWarning,
+            "The requested task could not be added to the DAG because a task with "
+            "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to "
+            "overwrite a task will raise an exception."
+        ):
+            with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
+                t1 = DummyOperator(task_id="t1")
+                t2 = BashOperator(task_id="t1", bash_command="sleep 1")
+                t1 >> t2
+
+        self.assertEqual(dag.task_dict, {t1.task_id: t1})
+
+        # Also verify that DAGs with duplicate task_ids don't raise errors
+        with DAG("test_dag_1", start_date=DEFAULT_DATE) as dag1:
+            t3 = DummyOperator(task_id="t3")
+            t4 = DummyOperator(task_id="t3")
+            t3 >> t4
+
+        self.assertEqual(dag1.task_dict, {t3.task_id: t3, t4.task_id: t4})
+
+    def test_duplicate_task_ids_not_allowed_without_dag_context_manager(self):
+        """Verify tasks with Duplicate task_id show warning"""
+        with self.assertWarnsRegex(
+            PendingDeprecationWarning,
+            "The requested task could not be added to the DAG because a task with "
+            "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to "
+            "overwrite a task will raise an exception."
+        ):
+            dag = DAG("test_dag", start_date=DEFAULT_DATE)
+            t1 = DummyOperator(task_id="t1", dag=dag)
+            t2 = BashOperator(task_id="t1", bash_command="sleep 1", dag=dag)
+            t1 >> t2
+
+        self.assertEqual(dag.task_dict, {t1.task_id: t1})
+
+        # Also verify that DAGs with duplicate task_ids don't raise errors
+        dag1 = DAG("test_dag_1", start_date=DEFAULT_DATE)
+        t3 = DummyOperator(task_id="t3", dag=dag1)
+        t4 = DummyOperator(task_id="t4", dag=dag1)

Review comment:
       Removed, I can't seem to remember why I add that test




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#discussion_r421406346



##########
File path: airflow/models/baseoperator.py
##########
@@ -546,7 +546,8 @@ def dag(self, dag):
                 "The DAG assigned to {} can not be changed.".format(self))
         elif self.task_id not in dag.task_dict:
             dag.add_task(self)
-
+        elif self.task_id in dag.task_dict and dag.task_dict[self.task_id] != self:

Review comment:
       Do we want `dag.task_dict[self.task_id] != self`, or `dag.task_dict[self.task_id] is not self`.
   
   The former use `__eq__`, the later checks for exactly the same object.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8728: Show Deprecation warning on duplicate Task ids

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#discussion_r421457539



##########
File path: airflow/models/baseoperator.py
##########
@@ -546,7 +546,8 @@ def dag(self, dag):
                 "The DAG assigned to {} can not be changed.".format(self))
         elif self.task_id not in dag.task_dict:
             dag.add_task(self)
-
+        elif self.task_id in dag.task_dict and dag.task_dict[self.task_id] != self:

Review comment:
       Updated, I will update Airflow Master as we currently raise errors after `==` check
   
   https://github.com/kaxil/airflow/blob/c3a46b9fec618489314a4ac8d4be289481e8edd3/airflow/models/baseoperator.py#L603-L606




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org