You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/04 10:38:58 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request, #25532: Move session.rollback closer to the exception

ephraimbuddy opened a new pull request, #25532:
URL: https://github.com/apache/airflow/pull/25532

   I'm not sure what's happening here but moving the session.rollback closer to the error stopped scheduler from crashing when there is an integrity error. I couldn't reproduce this in tests but can reproduce it consistently in airflow
   
   Here's how to reproduce the scheduler crash:
   Run this dag in main:
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.decorators import task
   
   with DAG(dag_id='mvp_map_task_bug', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
       @task
       def get_files():
           return [1,2,3,4,5,6]
   
       @task
       def download_files(file: str):
           print(f"{file}")
   
       files = download_files.expand(file=get_files())
   ```
   Stop the scheduler
   Reduce the list in the `get_files` to something like [1,2,3]
   Change this `difference` to `symmetric_difference`: https://github.com/apache/airflow/blob/171aaf017aee068d8e1b76121c8c75310c854d9e/airflow/models/dagrun.py#L1189
   
   Start the scheduler and clear the above task so it can run again. 
   Now, it'll try to create new TIs and the Scheduler will crash. Switch to this PR and try the same again. You will notice that the scheduler survived the crash and rollback was successful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205170997

   I have another hypothesis:
   
   ```
       run_id = Column(StringID(), nullable=False)
   ```
   
   And we are trying to access it in the log:
   ```
               self.log.info(
                   'Hit IntegrityError while creating the TIs for %s- %s',
                   dag_id,
                   self.run_id,
                   exc_info=True,
               )
   ```
   
   Run id is database, sqlalchemy object. And when you access it and It failed to be added due to Integrity Error, MAYBE when you access it before rollback, SQL Alchemy knows it just failed to add DafRun, it is in a limbo state when it is not yet added but not yet detached either, so it will return the error. 
   
   When you do rollback() the DagRun object itself becomes detecched and you can access it's run_id. 
   
   It's a long shot - but maybe ? You can test THAT hypothesis by logging a bogus string instead of self.run_id.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205204207

   > Still failed. I moved the log after the other log that used the `run_id`. That's this log: `self.log.info('Doing session rollback.')` before the session.rollback but it still failed. I think the issue is that we access `self` to get the log?
   
   Just to clarify (cause I am not 100% sure).  - did it also fail, when you did that: ? 
   
   ```
           except IntegrityError:
               self.log.info('Session rolled back.')
               session.rollback()
               self.log.info(
                   'Hit IntegrityError while creating the TIs for %s- %s',
                   dag_id,
                   self.run_id,
                   exc_info=True,
               )
   
   ```
   
   If that's the case, then  I am afraid the root cause is somewhere deeper. Probably we see some race condition manifesting tself. Accessing self.log should not trigger it. Anothe option is that one of the logging handlers of yours (do you have any?) Accesses the DB in some way,


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1206112702

   > I looked at the code and this seems to be the only place (except tests) where we use `session.bulk_*` operations so we should be pretty save.
   > 
   > However I think it would be good if we add a description of the problem here and link to the superset issue in a comment here just to capture it somewhere in the code (I am thinking about my fufure self in a year, hitting similar issue and trying to find out what was it about when I recall that something similar happened in the past) :)
   
   Look how I did it now. I feel it's much better and understandable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205273001

   I tihnk it the same issue as the one here: https://github.com/apache/superset/pull/530


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205116451

   However - this could also be just "logging" (no secret masking involved) - to verify this hypothesis you would have to repeat the tests without exc_info in the log.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205108996

   What was the crash you experienced? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205275812

   So I think the problem is due to bulk nature of the change above. DagRun is modfied by one of the "succesful" transactions, but then the exception makes the connection dirty. The DagRun object is not "expunged" - it's just marked as modified because part of the bulk update actually succeeded. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1206318345

   You probably want to rename it/update commit description before merge :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205261137

   > Exactly the code you have above is what I tried and it failed. I don't have any logging handlers and no environment variables was set too.
   
   Hmm. Can you really reproduce it ? Maybe there was some mistake when you run it. and you run another version (or maybe .pyc file was compiled or something like that).
   
   When I look closer at the stacktrace it DOES seem like the last hypothesis of mine..
   
   
   ```
     File "/opt/airflow/airflow/models/dagrun.py", line 950, in verify_integrity
       self._create_task_instances(dag.dag_id, tasks, created_counts, hook_is_noop, session=session)
     File "/opt/airflow/airflow/models/dagrun.py", line 1154, in _create_task_instances
       self.run_id,
   ```
   
   ^^ this is part of the log.info where run_id is accessed.
   
   ```
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
       return self.impl.get(state, dict_)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/attributes.py", line 926, in get
       value = self._fire_loader_callables(state, key, passive)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/attributes.py", line 957, in _fire_loader_callables
       return state._load_expired(state, passive)
   ```
   
   ^^ And it seems sql alchemy tries to retrieve the object and load it and it tries to execute SQL command but irt finds out that rollback is Pending and should be executed.
   
   So this would be rather strange to see it happening with this code:
   
   ```
           except IntegrityError:
               self.log.info('Session rolled back.')
               session.rollback()
               self.log.info(
                   'Hit IntegrityError while creating the TIs for %s- %s',
                   dag_id,
                   self.run_id,
                   exc_info=True,
               )
   ```
   
   Can you produce such an exception and see that the line number match and get the stack trace?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205243459

   Exactly the code you have above is what I tried and it failed. I don't have any logging handlers and no environment variables was set too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205387137

   I looked at the code and this seems to be the only place (except tests) where we use `session.bulk_*` operations so we should be pretty save. 
   
   However I think it would be good if we add a description of the problem here and link to the superset issue in a comment here just to capture it somewhere in the code (I am thinking about my fufure self in a year, hitting similar issue and trying to find out what was it about when I recall that something similar happened in the past) :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205120704

   Nothing obvious - but it would be great to verify these two hypotheses because maybe we can discover a pattern that we should avoid (or even forbid via pre-commit). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205267400

   > ^^ And it seems sql alchemy tries to retrieve the object and load it and it tries to execute SQL command but irt finds out that rollback is Pending and should be executed before..
   
   ```
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 710, in _load_expired
   ```
   
   This attribute has been marked as expired, so SQLA is trying to reload the value. SQLA doesn't do this by itself, so _somewhere_ we are expiring the object (but leaving it attached to the session. If it was detached via `expunge` it would behave differently.)
   
   Do we do make any calls to `expire`/`expire_all` after this object was loaded but before it was accessed in this block?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205363343

   @potiuk  You are right, I had to test now by restarting breeze. With just `self.log(...)` above the rollback, it didn't crash. The integrity error was caught.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205197171

   Still failed. I moved the log after the other log that used the `run_id`. That's this log: `self.log.info('Doing session rollback.')` before the session.rollback but it still failed. I think the issue is that we access `self` to get the log?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205113522

   BTW. By first look - the crash could be caused by some DB object held by the Integrity Error accessed by secret masker). The difference vs. the original implementation was that our logging (which could secret masking) walks through all objects in the exception as well, so it could have accessed something that would be released by rollback(). That could explain why you cannot reproduce it in tests.
   
   An easy way to verify that hypothesis is to disable secret masking (just to be sure - you can remove it from logging config rather than by airflow configuration and repeat your tests @ephraimbuddy .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205280168

   If that's the case then moving rollback() before is "good"  - because it effectively detaches the object from session and it is not going to be refreshed even if it is dirty and modified. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205156315

   > Nothing obvious - but it would be great to verify these two hypotheses because maybe we can discover a pattern that we should avoid (or even forbid via pre-commit).
   
   I removed exc_info and it still failed. It's the logging because once I move it past it, it doesn't crash.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1206317443

   Perfect!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy merged pull request #25532: Fix "This Session's transaction has been rolled back"

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged PR #25532:
URL: https://github.com/apache/airflow/pull/25532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205380402

   > @potiuk You are right, I had to test now by restarting breeze. With just `self.log(...)` above the rollback, it didn't crash. The integrity error was caught.
   
   So we have a VERY plausible explanation then :).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205117226

   > What was the crash you experienced?
   
   Here's the log:
   ```
   2022-08-04 11:12:57,341] {process_utils.py:76} INFO - Process psutil.Process(pid=22647, status='terminated', exitcode=0, started='11:12:49') (22647) terminated with exit code 0
   [2022-08-04 11:12:57,342] {scheduler_job.py:779} INFO - Exited execute loop
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1803, in _execute_context
       cursor, statement, parameters, context
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
   [2022-08-04 11:12:57 +0000] [22509] [INFO] Handling signal: term
       cursor.execute(statement, parameters)
   psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "task_instance_pkey"
   DETAIL:  Key (dag_id, task_id, run_id, map_index)=(mvp_map_task_bug, download_files, scheduled__2022-08-03T00:00:00+00:00, 1) already exists.
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/dagrun.py", line 1143, in _create_task_instances
   [2022-08-04 11:12:57 +0000] [22510] [INFO] Worker exiting (pid: 22510)
   [2022-08-04 11:12:57 +0000] [22563] [INFO] Worker exiting (pid: 22563)
       session.bulk_insert_mappings(TI, tasks)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 3711, in bulk_insert_mappings
       render_nulls,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 3811, in _bulk_save_mappings
       transaction.rollback(_capture_exception=True)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 72, in __exit__
       with_traceback=exc_tb,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
       raise exception
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 3805, in _bulk_save_mappings
       render_nulls,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 112, in _bulk_insert
       bookkeeping=return_defaults,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements
       statement, multiparams, execution_options=execution_options
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 326, in _execute_on_connection
       self, multiparams, params, execution_options
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1491, in _execute_clauseelement
       cache_hit=cache_hit,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
       e, statement, parameters, cursor, context
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 2027, in _handle_dbapi_exception
       sqlalchemy_exception, with_traceback=exc_info[2], from_=e
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
       raise exception
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1803, in _execute_context
       cursor, statement, parameters, context
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "task_instance_pkey"
   DETAIL:  Key (dag_id, task_id, run_id, map_index)=(mvp_map_task_bug, download_files, scheduled__2022-08-03T00:00:00+00:00, 1) already exists.
   
   [SQL: INSERT INTO task_instance (task_id, dag_id, run_id, map_index, try_number, max_tries, hostname, unixname, pool, pool_slots, queue, priority_weight, operator, executor_config) VALUES (%(task_id)s, %(dag_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(max_tries)s, %(hostname)s, %(unixname)s, %(pool)s, %(pool_slots)s, %(queue)s, %(priority_weight)s, %(operator)s, %(executor_config)s)]
   [parameters: {'task_id': 'download_files', 'dag_id': 'mvp_map_task_bug', 'run_id': 'scheduled__2022-08-03T00:00:00+00:00', 'map_index': 1, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'root', 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0xffff7a066630>}]
   (Background on this error at: https://sqlalche.me/e/14/gkpj)
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 33, in <module>
       sys.exit(load_entry_point('apache-airflow', 'console_scripts', 'airflow')())
     File "/opt/airflow/airflow/__main__.py", line 38, in main
       args.func(args)
     File "/opt/airflow/airflow/cli/cli_parser.py", line 51, in command
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/utils/cli.py", line 94, in wrapper
       return f(*args, **kwargs)
     File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 84, in scheduler
       _run_scheduler_job(args=args)
     File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 50, in _run_scheduler_job
       job.run()
     File "/opt/airflow/airflow/jobs/base_job.py", line 244, in run
       self._execute()
     File "/opt/airflow/airflow/jobs/scheduler_job.py", line 750, in _execute
       self._run_scheduler_loop()
     File "/opt/airflow/airflow/jobs/scheduler_job.py", line 859, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File "/opt/airflow/airflow/jobs/scheduler_job.py", line 941, in _do_scheduling
       callback_to_run = self._schedule_dag_run(dag_run, session)
     File "/opt/airflow/airflow/jobs/scheduler_job.py", line 1177, in _schedule_dag_run
       schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
     File "/opt/airflow/airflow/utils/session.py", line 68, in wrapper
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/models/dagrun.py", line 527, in update_state
       info = self.task_instance_scheduling_decisions(session)
     File "/opt/airflow/airflow/utils/session.py", line 68, in wrapper
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/models/dagrun.py", line 741, in task_instance_scheduling_decisions
       self.verify_integrity(missing_indexes=missing_indexes, session=session)
     File "/opt/airflow/airflow/utils/session.py", line 68, in wrapper
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/models/dagrun.py", line 950, in verify_integrity
       self._create_task_instances(dag.dag_id, tasks, created_counts, hook_is_noop, session=session)
     File "/opt/airflow/airflow/models/dagrun.py", line 1154, in _create_task_instances
       self.run_id,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
       return self.impl.get(state, dict_)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/attributes.py", line 926, in get
       value = self._fire_loader_callables(state, key, passive)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/attributes.py", line 957, in _fire_loader_callables
       return state._load_expired(state, passive)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 710, in _load_expired
       self.manager.expired_attribute_loader(self, toload, passive)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 1459, in load_scalar_attributes
       no_autoflush=no_autoflush,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 418, in load_on_ident
       execution_options=execution_options,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 534, in load_on_pk_identity
       bind_arguments=bind_arguments,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1688, in execute
       conn = self._connection_for_bind(bind)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1530, in _connection_for_bind
       engine, execution_options
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 721, in _connection_for_bind
       self._assert_active()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 608, in _assert_active
       code="7s2a",
   sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "task_instance_pkey"
   DETAIL:  Key (dag_id, task_id, run_id, map_index)=(mvp_map_task_bug, download_files, scheduled__2022-08-03T00:00:00+00:00, 1) already exists.
   
   [SQL: INSERT INTO task_instance (task_id, dag_id, run_id, map_index, try_number, max_tries, hostname, unixname, pool, pool_slots, queue, priority_weight, operator, executor_config) VALUES (%(task_id)s, %(dag_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(max_tries)s, %(hostname)s, %(unixname)s, %(pool)s, %(pool_slots)s, %(queue)s, %(priority_weight)s, %(operator)s, %(executor_config)s)]
   [parameters: {'task_id': 'download_files', 'dag_id': 'mvp_map_task_bug', 'run_id': 'scheduled__2022-08-03T00:00:00+00:00', 'map_index': 1, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'root', 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0xffff7a066630>}]
   (Background on this error at: https://sqlalche.me/e/14/gkpj) (Background on this error at: https://sqlalche.me/e/14/7s2a)
   [2022-08-04 11:12:57 +0000] [22509] [INFO] Shutting down: Master
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25532: Move session.rollback closer to the exception

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25532:
URL: https://github.com/apache/airflow/pull/25532#issuecomment-1205263735

   I'd only think such error would only  be possible (with the last code) if the DagRun object was created in a DIFFERENT session than the one that has just been rolled back.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org