You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/12 12:03:16 UTC
[GitHub] [airflow] ephraimbuddy opened a new pull request #19553: Fix IntegrityError in `DagFileProcessor.manage_slas`
ephraimbuddy opened a new pull request #19553:
URL: https://github.com/apache/airflow/pull/19553
The DagFileProcessor.manage_slas does not consider if an SlaMiss already exists in
DB while inserting slas.
If an SLA for a task is missed and recorded, on checking SLA again, this task
comes up again if there's no recent run of the task and we try to insert
the record into the SlaMiss table again, this results in Integrity error.
This PR fixes that by avoiding insert if the record already exists
```
[2021-11-12 11:56:11,159] {processor.py:567} ERROR - Error executing SlaCallbackRequest callback for file: /files/dags/example_sla_dag.py
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1257, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 912, in do_executemany
cursor.executemany(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "sla_miss_pkey"
DETAIL: Key (task_id, dag_id, execution_date)=(sleep_20, example_sla_dag, 2021-11-12 11:56:00+00) already exists.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/airflow/airflow/dag_processing/processor.py", line 560, in execute_callbacks
self.manage_slas(dagbag.dags.get(request.dag_id))
File "/opt/airflow/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/airflow/airflow/dag_processing/processor.py", line 434, in manage_slas
session.commit()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1046, in commit
self.transaction.commit()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 504, in commit
self._prepare_impl()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2540, in flush
self._flush(objects)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2682, in _flush
transaction.rollback(_capture_exception=True)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
with_traceback=exc_tb,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2642, in _flush
flush_context.execute()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
uow,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
insert,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1083, in _emit_insert_statements
c = cached_connections[connection].execute(statement, multiparams)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
distilled_params,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1257, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 912, in do_executemany
cursor.executemany(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "sla_miss_pkey"
DETAIL: Key (task_id, dag_id, execution_date)=(sleep_20, example_sla_dag, 2021-11-12 11:56:00+00) already exists.
```
---
**^ Add meaningful description above**
Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a change in pull request #19553: Fix IntegrityError in `DagFileProcessor.manage_slas`
Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19553:
URL: https://github.com/apache/airflow/pull/19553#discussion_r748241982
##########
File path: airflow/dag_processing/processor.py
##########
@@ -389,6 +389,12 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
.group_by(TI.task_id)
.subquery('sq')
)
+ # get recorded SlaMiss
+ recorded_slas = (
+ session.query(SlaMiss.dag_id, SlaMiss.task_id, SlaMiss.execution_date)
+ .filter(SlaMiss.dag_id == dag.dag_id, SlaMiss.task_id.in_(dag.task_ids))
+ .all()
+ )
Review comment:
```suggestion
recorded_slas_query = set(
session.query(SlaMiss.dag_id, SlaMiss.task_id, SlaMiss.execution_date)
.filter(SlaMiss.dag_id == dag.dag_id, SlaMiss.task_id.in_(dag.task_ids))
)
```
Make lookup faster.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ephraimbuddy closed pull request #19553: Fix IntegrityError in `DagFileProcessor.manage_slas`
Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #19553:
URL: https://github.com/apache/airflow/pull/19553
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #19553: Fix IntegrityError in `DagFileProcessor.manage_slas`
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19553:
URL: https://github.com/apache/airflow/pull/19553#issuecomment-967081656
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ephraimbuddy merged pull request #19553: Fix IntegrityError in `DagFileProcessor.manage_slas`
Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged pull request #19553:
URL: https://github.com/apache/airflow/pull/19553
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org