You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/28 22:09:17 UTC

[GitHub] [airflow] collinmcnulty opened a new issue, #23343: Silent DAG import error by making owner a list

collinmcnulty opened a new issue, #23343:
URL: https://github.com/apache/airflow/issues/23343

   ### Apache Airflow version
   
   2.2.5 (latest released)
   
   ### What happened
   
   If the argument `owner` is unhashable, such as a list, the DAG will fail to be imported, but will also not report as an import error. If the DAG is new, it will simply be missing. If this is an update to the existing DAG, the webserver will continue to show the old version.
   
   ### What you think should happen instead
   
   A DAG import error should be raised.
   
   ### How to reproduce
   
   Set the `owner` argument for a task to be a list. See this minimal reproduction DAG.
   
   ```
   from datetime import datetime
   from airflow.decorators import dag, task
   
   @dag(
       schedule_interval="@daily",
       start_date=datetime(2021, 1, 1),
       catchup=False,
       default_args={"owner": ["person"]},
       tags=['example'])
   def demo_bad_owner():
       @task()
       def say_hello():
           print("hello")
   
   demo_bad_owner()
   ```
   
   ### Operating System
   
   Debian Bullseye
   
   ### Versions of Apache Airflow Providers
   
   None needed.
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   The worker appears to still be able to execute the tasks when updating an existing DAG. Not sure how that's possible.
   
   Also reproduced on 2.3.0rc2.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil closed issue #23343: Silent DAG import error by making owner a list

Posted by GitBox <gi...@apache.org>.
kaxil closed issue #23343: Silent DAG import error by making owner a list
URL: https://github.com/apache/airflow/issues/23343


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tirkarthi commented on issue #23343: Silent DAG import error by making owner a list

Posted by GitBox <gi...@apache.org>.
tirkarthi commented on issue #23343:
URL: https://github.com/apache/airflow/issues/23343#issuecomment-1113256897

   Created https://github.com/apache/airflow/pull/23359 as a fix to check for type and raise `AirflowClusterPolicyViolation` with appropriate message.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tirkarthi commented on issue #23343: Silent DAG import error by making owner a list

Posted by GitBox <gi...@apache.org>.
tirkarthi commented on issue #23343:
URL: https://github.com/apache/airflow/issues/23343#issuecomment-1113211324

   I am unable to reproduce this with below test case and adding the file to my dags folder is able to load it though owner is set as empty.
   
   ```python
   def test_process_file_unhashable_owner(self):                                                                                                                                
       """Loading a DAG with owner as list"""                                                                                                                                   
       dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)                                                                                                
                                                                                                                                                                                
       def create_dag():                                                                                                                                                        
           from airflow.decorators import dag                                                                                                                                   
                                                                                                                                                                                
           @dag(default_args={'owners': ['owner1']})                                                                                                                            
           def my_flow_1():                                                                                                                                                     
               pass                                                                                                                                                             
                                                                                                                                                                                
           my_dag = my_flow_1()                                                                                                                                      
                                                                                                                                                                                
       source_lines = [line[12:] for line in inspect.getsource(create_dag).splitlines(keepends=True)[1:]]                                                                       
       with NamedTemporaryFile("w+", encoding="utf8") as tf_1:                                                                                                                  
           tf_1.writelines(source_lines)                                                                                                                                        
           tf_1.flush()                                                                                                                                                         
                                                                                                                                                                                
           found_1 = dagbag.process_file(tf_1.name)                                                                                                                             
           found_1[0].sync_to_db()                                                                                                                                              
           assert len(found_1) == 1 and found_1[0].dag_id == "my_flow_1"                                                                                                        
           assert dagbag.import_errors == {}                                                                                                                                    
           assert dagbag.dags['my_flow_1'] == found_1[0]                                                                                                                        
                                                                                                                                                                                
           with create_session() as session:                                                                                                                                    
               dag_db = session.query(DagModel).filter(DagModel.dag_id == 'my_flow_1').first()                                                                                  
               assert dag_db is not None 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tirkarthi commented on issue #23343: Silent DAG import error by making owner a list

Posted by GitBox <gi...@apache.org>.
tirkarthi commented on issue #23343:
URL: https://github.com/apache/airflow/issues/23343#issuecomment-1113241669

   Ok, I am able to reproduce with this test as below where there are no import errors. I didn't have cluster_policy enabled in my local airflow which validates the owner to be non-default. With owner being non-string it causes error from below traceback
   
   ```python
   @patch("airflow.settings.task_policy", cluster_policies.cluster_policy)                                                                                                      
   def test_task_cluster_policy_unhashable_owner(self):                                                                                                                         
       """                                                                                                                                                                      
       test that file processing results in import error when task does not                                                                                                     
       obey cluster policy and has non-string owner                                                                                                                             
       """                                                                                                                                                                      
       dag_file = os.path.join(TEST_DAGS_FOLDER, "test_unhashable_owner.py")                                                                                                    
                                                                                                                                                                                
       dagbag = DagBag(dag_folder=dag_file, include_smart_sensor=False, include_examples=False)                                                                                 
       assert set() == set(dagbag.dag_ids)                                                                                                                                      
       assert dagbag.import_errors == {} 
   ```
   
   ```test_unhashable_owner.py
   from airflow import DAG                                                                                                                                                      
   from airflow.operators.empty import EmptyOperator                                                                                                                            
   from airflow.utils.dates import days_ago                                                                                                                                     
                                                                                                                                                                                
   with DAG(                                                                                                                                                                    
       dag_id="test_missing_owner",                                                                                                                                             
       schedule_interval="0 0 * * *",                                                                                                                                           
       start_date=days_ago(2),                                                                                                                                                  
       dagrun_timeout=timedelta(minutes=60),                                                                                                                                    
       tags=["example"],                                                                                                                                                        
   ) as dag:            DAG(dag_id: str, description: Optional[str]=None, schedule_interval: ScheduleInt                                                                        
       run_this_last = EervalArg=NOTSET, timetable: Optional[Timetable]=None, start_date: Optional[datet                                                                        
           task_id="testime]=None, end_date: Optional[datetime]=None, full_filepath: Optional[str]=None,                                                                        
       )
   ```
   
   ```
   [2022-04-29 12:09:05,046] {dagbag.py:507} INFO - Filling up the DagBag from /opt/airflow/tests/dags/test_unhashable_owner.py
   [2022-04-29 12:09:05,048] {dagbag.py:535} ERROR - 'list' object has no attribute 'lower'
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/dagbag.py", line 522, in collect_dags
       found_dags = self.process_file(filepath, only_if_updated=only_if_updated, safe_mode=safe_mode)
     File "/opt/airflow/airflow/models/dagbag.py", line 290, in process_file
       found_dags = self._process_modules(filepath, mods, file_last_changed_on_disk)
     File "/opt/airflow/airflow/models/dagbag.py", line 408, in _process_modules
       self.bag_dag(dag=dag, root_dag=dag)
     File "/opt/airflow/airflow/models/dagbag.py", line 433, in bag_dag
       self._bag_dag(dag=dag, root_dag=root_dag, recursive=True)
     File "/opt/airflow/airflow/models/dagbag.py", line 450, in _bag_dag
       settings.task_policy(task)
     File "/opt/airflow/tests/cluster_policies/__init__.py", line 64, in cluster_policy
       _check_task_rules(task)
     File "/opt/airflow/tests/cluster_policies/__init__.py", line 50, in _check_task_rules
       rule(current_task)
     File "/opt/airflow/tests/cluster_policies/__init__.py", line 30, in task_must_have_owners
       if not task.owner or task.owner.lower() == conf.get('operators', 'default_owner'):
   AttributeError: 'list' object has no attribute 'lower'
   ----------------------------------------------------------------------------- Captured log call -----------------------------------------------------------------------------
   INFO     airflow.models.dagbag.DagBag:dagbag.py:507 Filling up the DagBag from /opt/airflow/tests/dags/test_unhashable_owner.py
   ERROR    airflow.models.dagbag.DagBag:dagbag.py:535 'list' object has no attribute 'lower'
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/dagbag.py", line 522, in collect_dags
       found_dags = self.process_file(filepath, only_if_updated=only_if_updated, safe_mode=safe_mode)
     File "/opt/airflow/airflow/models/dagbag.py", line 290, in process_file
       found_dags = self._process_modules(filepath, mods, file_last_changed_on_disk)
     File "/opt/airflow/airflow/models/dagbag.py", line 408, in _process_modules
       self.bag_dag(dag=dag, root_dag=dag)
     File "/opt/airflow/airflow/models/dagbag.py", line 433, in bag_dag
       self._bag_dag(dag=dag, root_dag=root_dag, recursive=True)
     File "/opt/airflow/airflow/models/dagbag.py", line 450, in _bag_dag
       settings.task_policy(task)
     File "/opt/airflow/tests/cluster_policies/__init__.py", line 64, in cluster_policy
       _check_task_rules(task)
     File "/opt/airflow/tests/cluster_policies/__init__.py", line 50, in _check_task_rules
       rule(current_task)
     File "/opt/airflow/tests/cluster_policies/__init__.py", line 30, in task_must_have_owners
       if not task.owner or task.owner.lower() == conf.get('operators', 'default_owner'):
   AttributeError: 'list' object has no attribute 'lower'
   ============================================================================= warnings summary ==============================================================================
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org