You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/21 13:06:26 UTC
[GitHub] [airflow] sudarshan2906 opened a new issue #13811: get_dagrun() function starts giving error in 2.0 when using inside cluster policy
sudarshan2906 opened a new issue #13811:
URL: https://github.com/apache/airflow/issues/13811
**Apache Airflow version**: 2.0
- **OS** (e.g. from /etc/os-release):
PRETTY_NAME="Debian GNU/Linux 10 (buster)"
NAME="Debian GNU/Linux"
VERSION_ID="10"
VERSION="10 (buster)"
VERSION_CODENAME=buster
ID=debian
**What happened**:
I am using a task_instance_mutation_hook cluster policy and inside that using get_dagrun() function from task_instance object to get the dag run object for the task instance. This was working fine in 1.10.14 but it started giving following error in 2.0 due to which scheduler is not starting.
```
dag_run = task_instance.get_dagrun()
File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", line 65, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.7/contextlib.py", line 119, in __exit__
next(self.gen)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", line 32, in create_session
session.commit()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1042, in commit
self.transaction.commit()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 504, in commit
self._prepare_impl()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 472, in _prepare_impl
self.session.dispatch.before_commit(self.session)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/event/attr.py", line 322, in __call__
fn(*args, **kw)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/sqlalchemy.py", line 217, in _validate_commit
raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")
```
I am using the dag run object to get the conf passed to the dag run and I am setting some properties of the task_instance according to it.
**How to reproduce it**:
Example of the cluster policy used:
```
def task_instance_mutation_hook(task_instance):
dag_run = task_instance.get_dagrun()
conf = dag_run.conf
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on issue #13811: get_dagrun() function starts giving error in 2.0 when using inside cluster policy
Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #13811:
URL: https://github.com/apache/airflow/issues/13811#issuecomment-773270707
Try with the following to delay import and creating a session in the mutation hook
```python
def task_instance_mutation_hook(task_instance):
from airflow.settings import Session
session = Session()
dag_run = task_instance.get_dagrun(session=session)
conf = dag_run.conf
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil edited a comment on issue #13811: get_dagrun() function starts giving error in 2.0 when using inside cluster policy
Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on issue #13811:
URL: https://github.com/apache/airflow/issues/13811#issuecomment-772513132
A workaround for this, for now, is to pass a session to `get_dagrun` method:
```python
from airflow.settings import Session
session = Session()
def task_instance_mutation_hook(task_instance):
dag_run = task_instance.get_dagrun(session=session)
conf = dag_run.conf
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on issue #13811: get_dagrun() function starts giving error in 2.0 when using inside cluster policy
Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #13811:
URL: https://github.com/apache/airflow/issues/13811#issuecomment-776668721
@sudarshan2906 Did it work?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] sudarshan2906 commented on issue #13811: get_dagrun() function starts giving error in 2.0 when using inside cluster policy
Posted by GitBox <gi...@apache.org>.
sudarshan2906 commented on issue #13811:
URL: https://github.com/apache/airflow/issues/13811#issuecomment-773161859
@kaxil
I tried the workaround. But getting this error:
```
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 5, in <module>
from airflow.__main__ import main
File "/usr/local/lib/python3.7/site-packages/airflow/__init__.py", line 46, in <module>
settings.initialize()
File "/usr/local/lib/python3.7/site-packages/airflow/settings.py", line 430, in initialize
import_local_settings()
File "/usr/local/lib/python3.7/site-packages/airflow/settings.py", line 400, in import_local_settings
import airflow_local_settings
File "/root/airflow/config/airflow_local_settings.py", line 7, in <module>
session = Session()
TypeError: 'NoneType' object is not callable
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] eladkal closed issue #13811: get_dagrun() function starts giving error in 2.0 when using inside cluster policy
Posted by GitBox <gi...@apache.org>.
eladkal closed issue #13811:
URL: https://github.com/apache/airflow/issues/13811
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] sudarshan2906 commented on issue #13811: get_dagrun() function starts giving error in 2.0 when using inside cluster policy
Posted by GitBox <gi...@apache.org>.
sudarshan2906 commented on issue #13811:
URL: https://github.com/apache/airflow/issues/13811#issuecomment-779676997
@kaxil Sorry for the late reply.
Yes this worked :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on issue #13811: get_dagrun() function starts giving error in 2.0 when using inside cluster policy
Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #13811:
URL: https://github.com/apache/airflow/issues/13811#issuecomment-772513132
A workaround for this, for now, is to pass a session to `get_dagrun` method:
```
from airflow.settings import Session
session = Session()
def task_instance_mutation_hook(task_instance):
dag_run = task_instance.get_dagrun(session=session)
conf = dag_run.conf
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on issue #13811: get_dagrun() function starts giving error in 2.0 when using inside cluster policy
Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #13811:
URL: https://github.com/apache/airflow/issues/13811#issuecomment-773270707
Try with the following to delay import and creating a session in the mutation hook
```python
def task_instance_mutation_hook(task_instance):
from airflow.settings import Session
session = Session()
dag_run = task_instance.get_dagrun(session=session)
conf = dag_run.conf
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] sudarshan2906 commented on issue #13811: get_dagrun() function starts giving error in 2.0 when using inside cluster policy
Posted by GitBox <gi...@apache.org>.
sudarshan2906 commented on issue #13811:
URL: https://github.com/apache/airflow/issues/13811#issuecomment-773161859
@kaxil
I tried the workaround. But getting this error:
```
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 5, in <module>
from airflow.__main__ import main
File "/usr/local/lib/python3.7/site-packages/airflow/__init__.py", line 46, in <module>
settings.initialize()
File "/usr/local/lib/python3.7/site-packages/airflow/settings.py", line 430, in initialize
import_local_settings()
File "/usr/local/lib/python3.7/site-packages/airflow/settings.py", line 400, in import_local_settings
import airflow_local_settings
File "/root/airflow/config/airflow_local_settings.py", line 7, in <module>
session = Session()
TypeError: 'NoneType' object is not callable
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org