You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/04 01:05:27 UTC

[GitHub] [airflow] sukso96100 commented on issue #25060: Airflow scheduler crashes due to 'duplicate key value violates unique constraint "task_instance_pkey"'

sukso96100 commented on issue #25060:
URL: https://github.com/apache/airflow/issues/25060#issuecomment-1204644351

   I'm also facing this issue, I'm also using 2.3.3 with dynamic task mapping. But using SQL Server for metadata database. The error on log is basically same.  (Inserting duplicate key on 'dbo.task_instance' is not possible because it violates constraint 'task_instance_pkey') After the error occures, Like other people here, Tasks and DAGs stuck in "running" state until Airflow banner notifies me that there is no scheduler heartbeat.
   
   More info on my Airflow environment
    - Kubernetes (Azure Kubernetes Service)
    - Using CeleryKubernetesExecutor
    - SQL Server as metadata database
   
   ```bash
   sqlalchemy.exc.IntegrityError: (pyodbc.IntegrityError) ('23000', "[23000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]PRIMARY KEY 제약 조건 'task_instance_pkey'을(를) 위반했습니다. 개체 'dbo.task_instance'에 중복 키를 삽입할 수 없습니다. 중복 키 값은 (az_partner_etl_usage, azplan_unbilled_lineitems, scheduled__2022-08-02T00:00:00+00:00, 0)입니다. (2627) 
   ```
   > Error code 2627 - [List of SQL Server error codes](https://docs.microsoft.com/en-us/sql/relational-databases/errors-events/database-engine-events-and-errors?view=sql-server-ver16)
   > Violation of PRIMARY KEY constraint 'task_instance_pkey'. Cannot insert duplicate key in object 'dbo.task_instance'. The duplicate key value is (az_partner_etl_usage, azplan_unbilled_lineitems, scheduled__2022-08-02T00:00:00+00:00, 0)
   
   <details>
     <summary>Click to log</summary>
     
   ```bash
   
   /home/airflow/.local/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py:52 DeprecationWarning: Passing filename_template to FileTaskHandler is deprecated and has no effect
     ____________       _____________
    ____    |__( )_________  __/__  /________      __
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   [2022-08-04 00:36:48,513] {scheduler_job.py:708} INFO - Starting the scheduler
   [2022-08-04 00:36:48,513] {scheduler_job.py:713} INFO - Processing each file at most -1 times
   [2022-08-04 00:36:48,600] {default_celery.py:97} WARNING - You have configured a result_backend of redis://:gDkWKFckB2@airflow-redis:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
   [2022-08-04 00:36:48,667] {kubernetes_executor.py:520} INFO - Start Kubernetes executor
   [2022-08-04 00:36:48,707] {kubernetes_executor.py:128} INFO - Event: and now my watch begins starting at resource_version: 0
   [2022-08-04 00:36:48,726] {kubernetes_executor.py:469} INFO - Found 0 queued task instances
   [2022-08-04 00:36:48,735] {manager.py:160} INFO - Launched DagFileProcessorManager with pid: 33
   [2022-08-04 00:36:48,737] {scheduler_job.py:1233} INFO - Resetting orphaned tasks for active dag runs
   [2022-08-04 00:36:48,750] {settings.py:55} INFO - Configured default timezone Timezone('UTC')
   [2022-08-04 00:36:48,757] {settings.py:540} INFO - Loaded airflow_local_settings from /opt/airflow/config/airflow_local_settings.py .
   /home/airflow/.local/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py:52 DeprecationWarning: Passing filename_template to FileTaskHandler is deprecated and has no effect
   [2022-08-04 00:36:50,327] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 [None]>'
   [2022-08-04 00:36:50,328] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=0 [None]>'
   [2022-08-04 00:36:50,329] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=1 [None]>'
   [2022-08-04 00:36:50,331] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=2 [None]>'
   [2022-08-04 00:36:50,332] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=3 [None]>'
   [2022-08-04 00:36:50,332] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=4 [None]>'
   [2022-08-04 00:36:50,333] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=5 [None]>'
   [2022-08-04 00:36:50,333] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=6 [None]>'
   [2022-08-04 00:36:50,335] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=7 [None]>'
   [2022-08-04 00:36:50,335] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=8 [None]>'
   [2022-08-04 00:36:50,336] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=9 [None]>'
   [2022-08-04 00:36:50,336] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=10 [None]>'
   [2022-08-04 00:36:50,338] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=11 [None]>'
   [2022-08-04 00:36:50,338] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=12 [None]>'
   [2022-08-04 00:36:50,339] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=13 [None]>'
   [2022-08-04 00:36:50,340] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=14 [None]>'
   [2022-08-04 00:36:50,340] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=15 [None]>'
   [2022-08-04 00:36:50,343] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=16 [None]>'
   [2022-08-04 00:36:50,343] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=17 [None]>'
   [2022-08-04 00:36:50,344] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=18 [None]>'
   [2022-08-04 00:36:50,344] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=19 [None]>'
   [2022-08-04 00:36:50,345] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=20 [None]>'
   [2022-08-04 00:36:50,345] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=21 [None]>'
   [2022-08-04 00:36:50,346] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=22 [None]>'
   [2022-08-04 00:36:50,346] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=23 [None]>'
   [2022-08-04 00:36:50,711] {scheduler_job.py:768} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
       self.dialect.do_execute(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
       cursor.execute(statement, parameters)
   pyodbc.IntegrityError: ('23000', "[23000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]PRIMARY KEY 제약 조건 'task_instance_pkey'을(를) 위반했습니다. 개체 'dbo.task_instance'에 중복 키를 삽입할 수 없습니다. 중복 키 값은 (az_partner_etl_usage, azplan_unbilled_lineitems, scheduled__2022-08-02T00:00:00+00:00, 0)입니다. (2627) (SQLExecDirectW)")
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 751, in _execute
       self._run_scheduler_loop()
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 839, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 921, in _do_scheduling
       callback_to_run = self._schedule_dag_run(dag_run, session)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1163, in _schedule_dag_run
       schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 68, in wrapper
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 524, in update_state
       info = self.task_instance_scheduling_decisions(session)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 68, in wrapper
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 654, in task_instance_scheduling_decisions
       schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 710, in _get_ready_tis
       expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/mappedoperator.py", line 683, in expand_mapped_task
       session.flush()
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3345, in flush
       self._flush(objects)
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3484, in _flush
       with util.safe_reraise():
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
       compat.raise_(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
       raise exception
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3445, in _flush
       flush_context.execute()
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
       rec.execute(self)
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
       util.preloaded.orm_persistence.save_obj(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
       _emit_update_statements(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1000, in _emit_update_statements
       c = connection._execute_20(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection
       return connection._execute_clauseelement(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1481, in _execute_clauseelement
       ret = self._execute_context(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
       self._handle_dbapi_exception(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
       util.raise_(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
       raise exception
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
       self.dialect.do_execute(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.IntegrityError: (pyodbc.IntegrityError) ('23000', "[23000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]PRIMARY KEY 제약 조건 'task_instance_pkey'을(를) 위반했습니다. 개체 'dbo.task_instance'에 중복 키를 삽입할 수 없습니다. 중복 키 값은 (az_partner_etl_usage, azplan_unbilled_lineitems, scheduled__2022-08-02T00:00:00+00:00, 0)입니다. (2627) (SQLExecDirectW)")
   [SQL: UPDATE task_instance SET map_index=? WHERE task_instance.task_id = ? AND task_instance.dag_id = ? AND task_instance.run_id = ? AND task_instance.map_index = ?]
   [parameters: (0, 'azplan_unbilled_lineitems', 'az_partner_etl_usage', 'scheduled__2022-08-02T00:00:00+00:00', -1)]
   (Background on this error at: https://sqlalche.me/e/14/gkpj)
   [2022-08-04 00:36:50,724] {kubernetes_executor.py:821} INFO - Shutting down Kubernetes executor
   [2022-08-04 00:36:51,788] {process_utils.py:125} INFO - Sending Signals.SIGTERM to group 33. PIDs of all processes in the group: [121, 122, 33]
   [2022-08-04 00:36:51,788] {process_utils.py:80} INFO - Sending the signal Signals.SIGTERM to group 33
   [2022-08-04 00:36:52,868] {process_utils.py:240} INFO - Waiting up to 5 seconds for processes to exit...
   [2022-08-04 00:36:52,875] {process_utils.py:75} INFO - Process psutil.Process(pid=121, status='terminated', started='00:36:51') (121) terminated with exit code None
   [2022-08-04 00:36:52,875] {process_utils.py:75} INFO - Process psutil.Process(pid=122, status='terminated', started='00:36:51') (122) terminated with exit code None
   [2022-08-04 00:36:52,876] {process_utils.py:75} INFO - Process psutil.Process(pid=33, status='terminated', exitcode=0, started='00:36:48') (33) terminated with exit code 0
   [2022-08-04 00:36:52,876] {scheduler_job.py:780} INFO - Exited execute loop
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
       self.dialect.do_execute(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
       cursor.execute(statement, parameters)
   pyodbc.IntegrityError: ('23000', "[23000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]PRIMARY KEY 제약 조건 'task_instance_pkey'을(를) 위반했습니다. 개체 'dbo.task_instance'에 중복 키를 삽입할 수 없습니다. 중복 키 값은 (az_partner_etl_usage, azplan_unbilled_lineitems, scheduled__2022-08-02T00:00:00+00:00, 0)입니다. (2627) (SQLExecDirectW)")
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/home/airflow/.local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/__main__.py", line 38, in main
       args.func(args)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line 51, in command
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", line 99, in wrapper
       return f(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
       _run_scheduler_job(args=args)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
       job.run()
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/base_job.py", line 244, in run
       self._execute()
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 751, in _execute
       self._run_scheduler_loop()
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 839, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 921, in _do_scheduling
       callback_to_run = self._schedule_dag_run(dag_run, session)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1163, in _schedule_dag_run
       schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 68, in wrapper
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 524, in update_state
       info = self.task_instance_scheduling_decisions(session)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 68, in wrapper
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 654, in task_instance_scheduling_decisions
       schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 710, in _get_ready_tis
       expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/mappedoperator.py", line 683, in expand_mapped_task
       session.flush()
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3345, in flush
       self._flush(objects)
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3484, in _flush
       with util.safe_reraise():
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
       compat.raise_(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
       raise exception
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3445, in _flush
       flush_context.execute()
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
       rec.execute(self)
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
       util.preloaded.orm_persistence.save_obj(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
       _emit_update_statements(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1000, in _emit_update_statements
       c = connection._execute_20(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection
       return connection._execute_clauseelement(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1481, in _execute_clauseelement
       ret = self._execute_context(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
       self._handle_dbapi_exception(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
       util.raise_(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
       raise exception
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
       self.dialect.do_execute(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.IntegrityError: (pyodbc.IntegrityError) ('23000', "[23000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]PRIMARY KEY 제약 조건 'task_instance_pkey'을(를) 위반했습니다. 개체 'dbo.task_instance'에 중복 키를 삽입할 수 없습니다. 중복 키 값은 (az_partner_etl_usage, azplan_unbilled_lineitems, scheduled__2022-08-02T00:00:00+00:00, 0)입니다. (2627) (SQLExecDirectW)")
   [SQL: UPDATE task_instance SET map_index=? WHERE task_instance.task_id = ? AND task_instance.dag_id = ? AND task_instance.run_id = ? AND task_instance.map_index = ?]
   [parameters: (0, 'azplan_unbilled_lineitems', 'az_partner_etl_usage', 'scheduled__2022-08-02T00:00:00+00:00', -1)]
   (Background on this error at: https://sqlalche.me/e/14/gkpj)
   
   /home/airflow/.local/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py:52 DeprecationWarning: Passing filename_template to FileTaskHandler is deprecated and has no effect
     ____________       _____________
    ____    |__( )_________  __/__  /________      __
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   [2022-08-04 00:42:09,592] {scheduler_job.py:708} INFO - Starting the scheduler
   [2022-08-04 00:42:09,592] {scheduler_job.py:713} INFO - Processing each file at most -1 times
   [2022-08-04 00:42:09,698] {default_celery.py:97} WARNING - You have configured a result_backend of redis://:gDkWKFckB2@airflow-redis:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
   [2022-08-04 00:42:09,775] {kubernetes_executor.py:520} INFO - Start Kubernetes executor
   [2022-08-04 00:42:09,816] {kubernetes_executor.py:128} INFO - Event: and now my watch begins starting at resource_version: 0
   [2022-08-04 00:42:09,836] {kubernetes_executor.py:469} INFO - Found 0 queued task instances
   [2022-08-04 00:42:09,845] {manager.py:160} INFO - Launched DagFileProcessorManager with pid: 33
   [2022-08-04 00:42:09,847] {scheduler_job.py:1233} INFO - Resetting orphaned tasks for active dag runs
   [2022-08-04 00:42:09,863] {settings.py:55} INFO - Configured default timezone Timezone('UTC')
   [2022-08-04 00:42:09,868] {settings.py:540} INFO - Loaded airflow_local_settings from /opt/airflow/config/airflow_local_settings.py .
   /home/airflow/.local/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py:52 DeprecationWarning: Passing filename_template to FileTaskHandler is deprecated and has no effect
   [2022-08-04 00:42:12,059] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 [None]>'
   [2022-08-04 00:42:12,060] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=0 [None]>'
   [2022-08-04 00:42:12,060] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=1 [None]>'
   [2022-08-04 00:42:12,061] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=2 [None]>'
   [2022-08-04 00:42:12,061] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=3 [None]>'
   [2022-08-04 00:42:12,061] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=4 [None]>'
   [2022-08-04 00:42:12,062] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=5 [None]>'
   [2022-08-04 00:42:12,063] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=6 [None]>'
   [2022-08-04 00:42:12,063] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=7 [None]>'
   [2022-08-04 00:42:12,067] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=8 [None]>'
   [2022-08-04 00:42:12,067] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=9 [None]>'
   [2022-08-04 00:42:12,068] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=10 [None]>'
   [2022-08-04 00:42:12,068] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=11 [None]>'
   [2022-08-04 00:42:12,068] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=12 [None]>'
   [2022-08-04 00:42:12,069] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=13 [None]>'
   [2022-08-04 00:42:12,069] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=14 [None]>'
   [2022-08-04 00:42:12,070] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=15 [None]>'
   [2022-08-04 00:42:12,070] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=16 [None]>'
   [2022-08-04 00:42:12,071] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=17 [None]>'
   [2022-08-04 00:42:12,071] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=18 [None]>'
   [2022-08-04 00:42:12,072] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=19 [None]>'
   [2022-08-04 00:42:12,075] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=20 [None]>'
   [2022-08-04 00:42:12,075] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=21 [None]>'
   [2022-08-04 00:42:12,076] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=22 [None]>'
   [2022-08-04 00:42:12,076] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=23 [None]>'
   [2022-08-04 00:42:13,698] {process_utils.py:125} INFO - Sending Signals.SIGTERM to group 33. PIDs of all processes in the group: [118, 120, 33]
   [2022-08-04 00:42:13,698] {process_utils.py:80} INFO - Sending the signal Signals.SIGTERM to group 33
   [2022-08-04 00:42:14,155] {process_utils.py:240} INFO - Waiting up to 5 seconds for processes to exit...
   [2022-08-04 00:42:14,161] {process_utils.py:240} INFO - Waiting up to 5 seconds for processes to exit...
   [2022-08-04 00:42:14,208] {process_utils.py:75} INFO - Process psutil.Process(pid=33, status='terminated', exitcode=0, started='00:42:09') (33) terminated with exit code 0
   [2022-08-04 00:42:14,209] {process_utils.py:75} INFO - Process psutil.Process(pid=118, status='terminated', started='00:42:12') (118) terminated with exit code None
   [2022-08-04 00:42:14,209] {process_utils.py:75} INFO - Process psutil.Process(pid=120, status='terminated', started='00:42:13') (120) terminated with exit code None
   [2022-08-04 00:42:14,210] {scheduler_job.py:780} INFO - Exited execute loop
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
       self.dialect.do_execute(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
       cursor.execute(statement, parameters)
   pyodbc.IntegrityError: ('23000', "[23000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]PRIMARY KEY 제약 조건 'task_instance_pkey'을(를) 위반했습니다. 개체 'dbo.task_instance'에 중복 키를 삽입할 수 없습니다. 중복 키 값은 (az_partner_etl_usage, azplan_unbilled_lineitems, scheduled__2022-08-02T00:00:00+00:00, 0)입니다. (2627) (SQLExecDirectW)")
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/home/airflow/.local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/__main__.py", line 38, in main
       args.func(args)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line 51, in command
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", line 99, in wrapper
       return f(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
       _run_scheduler_job(args=args)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
       job.run()
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/base_job.py", line 244, in run
       self._execute()
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 751, in _execute
       self._run_scheduler_loop()
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 839, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 921, in _do_scheduling
       callback_to_run = self._schedule_dag_run(dag_run, session)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1163, in _schedule_dag_run
       schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 68, in wrapper
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 524, in update_state
       info = self.task_instance_scheduling_decisions(session)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 68, in wrapper
       return func(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 654, in task_instance_scheduling_decisions
       schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 710, in _get_ready_tis
       expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)
     File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/mappedoperator.py", line 683, in expand_mapped_task
       session.flush()
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3345, in flush
       self._flush(objects)
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3484, in _flush
       with util.safe_reraise():
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
       compat.raise_(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
       raise exception
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3445, in _flush
       flush_context.execute()
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
       rec.execute(self)
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
       util.preloaded.orm_persistence.save_obj(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
       _emit_update_statements(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1000, in _emit_update_statements
       c = connection._execute_20(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection
       return connection._execute_clauseelement(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1481, in _execute_clauseelement
       ret = self._execute_context(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
       self._handle_dbapi_exception(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
       util.raise_(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
       raise exception
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
       self.dialect.do_execute(
     File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.IntegrityError: (pyodbc.IntegrityError) ('23000', "[23000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]PRIMARY KEY 제약 조건 'task_instance_pkey'을(를) 위반했습니다. 개체 'dbo.task_instance'에 중복 키를 삽입할 수 없습니다. 중복 키 값은 (az_partner_etl_usage, azplan_unbilled_lineitems, scheduled__2022-08-02T00:00:00+00:00, 0)입니다. (2627) (SQLExecDirectW)")
   [SQL: UPDATE task_instance SET map_index=? WHERE task_instance.task_id = ? AND task_instance.dag_id = ? AND task_instance.run_id = ? AND task_instance.map_index = ?]
   [parameters: (0, 'azplan_unbilled_lineitems', 'az_partner_etl_usage', 'scheduled__2022-08-02T00:00:00+00:00', -1)]
   (Background on this error at: https://sqlalche.me/e/14/gkpj)
   ```
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org