You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ImmortalLotus (via GitHub)" <gi...@apache.org> on 2023/10/25 16:31:05 UTC
[I] Airflow Can't Terminate Tasks that are created by a DAG that uses DAG Decorator. [airflow]
ImmortalLotus opened a new issue, #35179:
URL: https://github.com/apache/airflow/issues/35179
### Apache Airflow version
2.7.2
### What happened
airflow throws this error when trying to terminate a task that comes from a DAG that is created using DAG decorator and then it can't terminate the pod when using Kubernetes Executor.
It runs the task correctly, however when killing the pod it throws this error and then does not delete the pod.
exit log:
```
[2023-10-25T13:23:21.856+0000] {local_task_job_runner.py:228} INFO - Task exited with return code 0
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/__main__.py", line 60, in main
args.func(args)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 113, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 430, in task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 208, in _run_task_by_selected_method
return _run_task_by_local_task_job(args, ti)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 270, in _run_task_by_local_task_job
ret = run_job(job=job_runner.job, execute_callable=job_runner._execute)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 77, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airf
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/job.py", line 318, in execute_job
ret = execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job_runner.py", line 192, in _execute
self.handle_task_exit(return_code)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job_runner.py", line 232, in handle_task_exit
self.task_instance.schedule_downstream_tasks(max_tis_per_query=self.job.max_tis_per_query)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 77, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2754, in schedule_downstream_tasks
partial_dag = task.dag.partial_subset(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 2393, in partial_subset
dag.task_dict = {
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 2394, in <dictcomp>
t.task_id: _deepcopy_task(t)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 2391, in _deepcopy_task
return copy.deepcopy(t, memo)
File "/usr/local/lib/python3.8/copy.py", line 153, in deepcopy
y = copier(memo)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1214, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 161, in deepcopy
rv = reductor(4)
TypeError: cannot pickle 'module' object
```
### What you think should happen instead
It should terminate correctly and delete the pod.
### How to reproduce
Use the following class(that is still not perfect) to generate a dag like below. the purpose of this class is to encapsulate some common usage we have when working some speficic EL jobs:
```
from __future__ import annotations
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from docker.types import Mount
from airflow.operators.dummy_operator import DummyOperator
from kubernetes.client import models as k8s
from airflow.models import Variable
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from typing import TYPE_CHECKING, Any
import httpx
import pendulum
from sqlalchemy import create_engine
from airflow.decorators import dag, task
from airflow.models.baseoperator import BaseOperator
from airflow.operators.email import EmailOperator
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator
if TYPE_CHECKING:
from airflow.utils.context import Context
# [START dag_decorator_usage]
class sql_dag():
def __init__(self,nome_servidor_destino, servidor_origem_dict:dict, nome_banco_destino , nome_banco_origem, schema):
conexao = MsSqlHook.get_connection(nome_servidor_destino)
self.hook_banco_destino=MsSqlHook(mssql_conn_id=nome_servidor_destino)
self.engine_destino = create_engine(f'mssql+pyodbc://{conexao.login}:{conexao.password}@{conexao.host}/{nome_banco_destino}?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=YES')
for chave, valor in servidor_origem_dict.items():
if(valor=='Postgres'):
self.hook_banco_origem=PostgresHook(postgres_conn_id=chave)
elif(valor=='MSSQL'):
self.hook_banco_origem=MsSqlHook(mssql_conn_id=chave)
else:
raise Exception("banco não suportado")
self.schema = schema
self.sql_template_padrao=f'use [{nome_banco_origem}] select * from '
self.truncate_sql=f"""use [{nome_banco_destino}] truncate table """
def configurar_dag(self,horario:str,nome:str, origem,acao,destino):
tabela_dict= Variable.get(nome, default_var={"erro":["erro"]}, deserialize_json=True)
def truncate_tabelas(**kwargs):
sql3=kwargs['self'].truncate_sql+kwargs['tabela']
kwargs['self'].hook_banco_destino.run(sql3)
def insert_tabelas(**kwargs):
sql4 = kwargs['self'].sql_template_padrao+kwargs['tabela']
df = kwargs['self'].hook_banco_origem.get_pandas_df(sql4)
df.to_sql(kwargs['tabela'], schema=kwargs['self'].schema,con=kwargs['self'].engine_destino,if_exists='append', index=False)
@dag(
schedule=horario,
dag_id=nome,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=[origem,acao,destino]
)
def criar():
start = DummyOperator(
task_id='comecaDag'
)
end = DummyOperator(
task_id='endDag'
)
with TaskGroup(group_id=f"insert_tabelas") as insert:
se_estiver_vazio=DummyOperator(
task_id='pra_evitar_erros'
)
for chave,valor in tabela_dict.items():
if(valor=='padraoSemDtInsert'):
task_insert_tabelas = PythonOperator(task_id=f'insert_{chave}',
python_callable=insert_tabelas,
op_kwargs={"tabela":chave, "self":self}
)
with TaskGroup(group_id=f"truncate_tabelas") as truncate:
se_estiver_vazio2=DummyOperator(
task_id='pra_evitar_erros2'
)
for chave, valor in tabela_dict.items():
task_truncate_tabelas = PythonOperator(task_id=f'truncate_{chave}',
python_callable=truncate_tabelas,
op_kwargs={"tabela":chave, "self":self}
)
start>>truncate>>insert>>end
dag_teste=criar()
```
code to use the above class, that works when considering a MSSQL destination and a PGSQL/MSSQL Source:
```
from cagd_libs.BANCOS_SQL import sql_dag
from airflow.models import Variable
tabela_dict={"YourTable":"padraoSemDtInsert"
}
nome_variavel="ID_Of_the_DAG_YOULL_CREATE"
Variable.set(key=nome_variavel, value=tabela_dict, serialize_json=True)
dag_sql=sql_dag(nome_servidor_destino='ID_OF_THE_DESTINATION_DATABASE_IN_AIRFLOW',
servidor_origem_dict={'ID_OF_THE_SOURCE_DATABASE_IN_AIRFLOW':'Postgres'},
nome_banco_destino='destination_database_name',
nome_banco_origem='source_database_name',
schema='database_schema')
teste_dag= dag_sql.configurar_dag('airflow_schedule',nome_variavel,"tags_that_we_use","tags_that_we_use","tags_that_we_use")
```
### Operating System
Red Hat Openshift, airflow HelmChart
### Versions of Apache Airflow Providers
pip install apache-airflow-providers-odbc \
&& pip install apache-airflow-providers-microsoft-mssql
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else
this error occus on every that that is creating using above class, regardless of whether it runs correctly or not, the pod throws the log cited above and then is not deleted
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [I] Airflow Can't Terminate Tasks that are created by a DAG that uses DAG Decorator. [airflow]
Posted by "ImmortalLotus (via GitHub)" <gi...@apache.org>.
ImmortalLotus commented on issue #35179:
URL: https://github.com/apache/airflow/issues/35179#issuecomment-1779653517
the main thing is that the DAG runs correctly, however the pod isn't deleted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [I] Airflow Can't Terminate Tasks that are created by a DAG that uses DAG Decorator. [airflow]
Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis closed issue #35179: Airflow Can't Terminate Tasks that are created by a DAG that uses DAG Decorator.
URL: https://github.com/apache/airflow/issues/35179
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [I] Airflow Can't Terminate Tasks that are created by a DAG that uses DAG Decorator. [airflow]
Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on issue #35179:
URL: https://github.com/apache/airflow/issues/35179#issuecomment-1779648317
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org