You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/12 12:03:16 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #19553: Fix IntegrityError in `DagFileProcessor.manage_slas`

ephraimbuddy opened a new pull request #19553:
URL: https://github.com/apache/airflow/pull/19553


   The DagFileProcessor.manage_slas does not consider if an SlaMiss already exists in
   DB while inserting slas.
   
   If an SLA for a task is missed and recorded, on checking SLA again, this task
   comes up again if there's no recent run of the task and we try to insert
   the record into the SlaMiss table again, this results in Integrity error.
   
   This PR fixes that by avoiding insert if the record already exists
   
   ```
   [2021-11-12 11:56:11,159] {processor.py:567} ERROR - Error executing SlaCallbackRequest callback for file: /files/dags/example_sla_dag.py
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1257, in _execute_context
       cursor, statement, parameters, context
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 912, in do_executemany
       cursor.executemany(statement, parameters)
   psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "sla_miss_pkey"
   DETAIL:  Key (task_id, dag_id, execution_date)=(sleep_20, example_sla_dag, 2021-11-12 11:56:00+00) already exists.
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/opt/airflow/airflow/dag_processing/processor.py", line 560, in execute_callbacks
       self.manage_slas(dagbag.dags.get(request.dag_id))
     File "/opt/airflow/airflow/utils/session.py", line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/airflow/airflow/dag_processing/processor.py", line 434, in manage_slas
       session.commit()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1046, in commit
       self.transaction.commit()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 504, in commit
       self._prepare_impl()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
       self.session.flush()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2540, in flush
       self._flush(objects)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2682, in _flush
       transaction.rollback(_capture_exception=True)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
       with_traceback=exc_tb,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
       raise exception
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2642, in _flush
       flush_context.execute()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
       rec.execute(self)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
       uow,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
       insert,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1083, in _emit_insert_statements
       c = cached_connections[connection].execute(statement, multiparams)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
       return meth(self, multiparams, params)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
       return connection._execute_clauseelement(self, multiparams, params)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
       distilled_params,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
       e, statement, parameters, cursor, context
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
       sqlalchemy_exception, with_traceback=exc_info[2], from_=e
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
       raise exception
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1257, in _execute_context
       cursor, statement, parameters, context
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 912, in do_executemany
       cursor.executemany(statement, parameters)
   sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "sla_miss_pkey"
   DETAIL:  Key (task_id, dag_id, execution_date)=(sleep_20, example_sla_dag, 2021-11-12 11:56:00+00) already exists.
   ```
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19553: Fix IntegrityError in `DagFileProcessor.manage_slas`

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19553:
URL: https://github.com/apache/airflow/pull/19553#discussion_r748241982



##########
File path: airflow/dag_processing/processor.py
##########
@@ -389,6 +389,12 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
             .group_by(TI.task_id)
             .subquery('sq')
         )
+        # get recorded SlaMiss
+        recorded_slas = (
+            session.query(SlaMiss.dag_id, SlaMiss.task_id, SlaMiss.execution_date)
+            .filter(SlaMiss.dag_id == dag.dag_id, SlaMiss.task_id.in_(dag.task_ids))
+            .all()
+        )

Review comment:
       ```suggestion
           recorded_slas_query = set(
               session.query(SlaMiss.dag_id, SlaMiss.task_id, SlaMiss.execution_date)
               .filter(SlaMiss.dag_id == dag.dag_id, SlaMiss.task_id.in_(dag.task_ids))
           )
   ```
   
   Make lookup faster.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #19553: Fix IntegrityError in `DagFileProcessor.manage_slas`

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #19553:
URL: https://github.com/apache/airflow/pull/19553


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19553: Fix IntegrityError in `DagFileProcessor.manage_slas`

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19553:
URL: https://github.com/apache/airflow/pull/19553#issuecomment-967081656


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy merged pull request #19553: Fix IntegrityError in `DagFileProcessor.manage_slas`

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged pull request #19553:
URL: https://github.com/apache/airflow/pull/19553


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org