You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/28 22:09:17 UTC
[GitHub] [airflow] collinmcnulty opened a new issue, #23343: Silent DAG import error by making owner a list
collinmcnulty opened a new issue, #23343:
URL: https://github.com/apache/airflow/issues/23343
### Apache Airflow version
2.2.5 (latest released)
### What happened
If the argument `owner` is unhashable, such as a list, the DAG will fail to be imported, but will also not report as an import error. If the DAG is new, it will simply be missing. If this is an update to the existing DAG, the webserver will continue to show the old version.
### What you think should happen instead
A DAG import error should be raised.
### How to reproduce
Set the `owner` argument for a task to be a list. See this minimal reproduction DAG.
```
from datetime import datetime
from airflow.decorators import dag, task
@dag(
schedule_interval="@daily",
start_date=datetime(2021, 1, 1),
catchup=False,
default_args={"owner": ["person"]},
tags=['example'])
def demo_bad_owner():
@task()
def say_hello():
print("hello")
demo_bad_owner()
```
### Operating System
Debian Bullseye
### Versions of Apache Airflow Providers
None needed.
### Deployment
Astronomer
### Deployment details
_No response_
### Anything else
The worker appears to still be able to execute the tasks when updating an existing DAG. Not sure how that's possible.
Also reproduced on 2.3.0rc2.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil closed issue #23343: Silent DAG import error by making owner a list
Posted by GitBox <gi...@apache.org>.
kaxil closed issue #23343: Silent DAG import error by making owner a list
URL: https://github.com/apache/airflow/issues/23343
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] tirkarthi commented on issue #23343: Silent DAG import error by making owner a list
Posted by GitBox <gi...@apache.org>.
tirkarthi commented on issue #23343:
URL: https://github.com/apache/airflow/issues/23343#issuecomment-1113256897
Created https://github.com/apache/airflow/pull/23359 as a fix to check for type and raise `AirflowClusterPolicyViolation` with appropriate message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] tirkarthi commented on issue #23343: Silent DAG import error by making owner a list
Posted by GitBox <gi...@apache.org>.
tirkarthi commented on issue #23343:
URL: https://github.com/apache/airflow/issues/23343#issuecomment-1113211324
I am unable to reproduce this with below test case and adding the file to my dags folder is able to load it though owner is set as empty.
```python
def test_process_file_unhashable_owner(self):
"""Loading a DAG with owner as list"""
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
def create_dag():
from airflow.decorators import dag
@dag(default_args={'owners': ['owner1']})
def my_flow_1():
pass
my_dag = my_flow_1()
source_lines = [line[12:] for line in inspect.getsource(create_dag).splitlines(keepends=True)[1:]]
with NamedTemporaryFile("w+", encoding="utf8") as tf_1:
tf_1.writelines(source_lines)
tf_1.flush()
found_1 = dagbag.process_file(tf_1.name)
found_1[0].sync_to_db()
assert len(found_1) == 1 and found_1[0].dag_id == "my_flow_1"
assert dagbag.import_errors == {}
assert dagbag.dags['my_flow_1'] == found_1[0]
with create_session() as session:
dag_db = session.query(DagModel).filter(DagModel.dag_id == 'my_flow_1').first()
assert dag_db is not None
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] tirkarthi commented on issue #23343: Silent DAG import error by making owner a list
Posted by GitBox <gi...@apache.org>.
tirkarthi commented on issue #23343:
URL: https://github.com/apache/airflow/issues/23343#issuecomment-1113241669
Ok, I am able to reproduce with this test as below where there are no import errors. I didn't have cluster_policy enabled in my local airflow which validates the owner to be non-default. With owner being non-string it causes error from below traceback
```python
@patch("airflow.settings.task_policy", cluster_policies.cluster_policy)
def test_task_cluster_policy_unhashable_owner(self):
"""
test that file processing results in import error when task does not
obey cluster policy and has non-string owner
"""
dag_file = os.path.join(TEST_DAGS_FOLDER, "test_unhashable_owner.py")
dagbag = DagBag(dag_folder=dag_file, include_smart_sensor=False, include_examples=False)
assert set() == set(dagbag.dag_ids)
assert dagbag.import_errors == {}
```
```test_unhashable_owner.py
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id="test_missing_owner",
schedule_interval="0 0 * * *",
start_date=days_ago(2),
dagrun_timeout=timedelta(minutes=60),
tags=["example"],
) as dag: DAG(dag_id: str, description: Optional[str]=None, schedule_interval: ScheduleInt
run_this_last = EervalArg=NOTSET, timetable: Optional[Timetable]=None, start_date: Optional[datet
task_id="testime]=None, end_date: Optional[datetime]=None, full_filepath: Optional[str]=None,
)
```
```
[2022-04-29 12:09:05,046] {dagbag.py:507} INFO - Filling up the DagBag from /opt/airflow/tests/dags/test_unhashable_owner.py
[2022-04-29 12:09:05,048] {dagbag.py:535} ERROR - 'list' object has no attribute 'lower'
Traceback (most recent call last):
File "/opt/airflow/airflow/models/dagbag.py", line 522, in collect_dags
found_dags = self.process_file(filepath, only_if_updated=only_if_updated, safe_mode=safe_mode)
File "/opt/airflow/airflow/models/dagbag.py", line 290, in process_file
found_dags = self._process_modules(filepath, mods, file_last_changed_on_disk)
File "/opt/airflow/airflow/models/dagbag.py", line 408, in _process_modules
self.bag_dag(dag=dag, root_dag=dag)
File "/opt/airflow/airflow/models/dagbag.py", line 433, in bag_dag
self._bag_dag(dag=dag, root_dag=root_dag, recursive=True)
File "/opt/airflow/airflow/models/dagbag.py", line 450, in _bag_dag
settings.task_policy(task)
File "/opt/airflow/tests/cluster_policies/__init__.py", line 64, in cluster_policy
_check_task_rules(task)
File "/opt/airflow/tests/cluster_policies/__init__.py", line 50, in _check_task_rules
rule(current_task)
File "/opt/airflow/tests/cluster_policies/__init__.py", line 30, in task_must_have_owners
if not task.owner or task.owner.lower() == conf.get('operators', 'default_owner'):
AttributeError: 'list' object has no attribute 'lower'
----------------------------------------------------------------------------- Captured log call -----------------------------------------------------------------------------
INFO airflow.models.dagbag.DagBag:dagbag.py:507 Filling up the DagBag from /opt/airflow/tests/dags/test_unhashable_owner.py
ERROR airflow.models.dagbag.DagBag:dagbag.py:535 'list' object has no attribute 'lower'
Traceback (most recent call last):
File "/opt/airflow/airflow/models/dagbag.py", line 522, in collect_dags
found_dags = self.process_file(filepath, only_if_updated=only_if_updated, safe_mode=safe_mode)
File "/opt/airflow/airflow/models/dagbag.py", line 290, in process_file
found_dags = self._process_modules(filepath, mods, file_last_changed_on_disk)
File "/opt/airflow/airflow/models/dagbag.py", line 408, in _process_modules
self.bag_dag(dag=dag, root_dag=dag)
File "/opt/airflow/airflow/models/dagbag.py", line 433, in bag_dag
self._bag_dag(dag=dag, root_dag=root_dag, recursive=True)
File "/opt/airflow/airflow/models/dagbag.py", line 450, in _bag_dag
settings.task_policy(task)
File "/opt/airflow/tests/cluster_policies/__init__.py", line 64, in cluster_policy
_check_task_rules(task)
File "/opt/airflow/tests/cluster_policies/__init__.py", line 50, in _check_task_rules
rule(current_task)
File "/opt/airflow/tests/cluster_policies/__init__.py", line 30, in task_must_have_owners
if not task.owner or task.owner.lower() == conf.get('operators', 'default_owner'):
AttributeError: 'list' object has no attribute 'lower'
============================================================================= warnings summary ==============================================================================
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org