You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "takersk (via GitHub)" <gi...@apache.org> on 2023/02/20 00:44:38 UTC
[GitHub] [airflow] takersk commented on issue #29561: CeleryKubernetesExecutor @task(queue="kubernetes") not work
takersk commented on issue #29561:
URL: https://github.com/apache/airflow/issues/29561#issuecomment-1436153289
@hussein-awala
yes!!
```
import os
from datetime import datetime, timedelta
from airflow import DAG
from common.enum.enum import DagEnum, DatahubEnum
from common.operator.custom_dummy_operator import CustomDummyOperator
from common.options.trino_options import TrinoStringOptions
from common.utils.date import KST
from common.utils.date import (
execution_date_fmt,
)
from common.utils.pod import (
exec_trino_query_pod,
)
from common.utils.watchtower import WatchTower
from airflow.decorators import task
@task(queue="kubernetes")
def example_task_kubernetes(_dag):
query_str = "select * from kudu.dw.dim_gift_brand limit 10"
exec_trino_query_string_task = exec_trino_query_pod(
dag=_dag,
task_id="example_task_kubernetes",
trino_options=TrinoStringOptions(
query_string=query_str,
)
)
return exec_trino_query_string_task
def example_task_celery(_dag):
query_str = "select * from kudu.dw.dim_gift_brand limit 10"
exec_trino_query_string_task = exec_trino_query_pod(
dag=_dag,
task_id="example_task_celery",
trino_options=TrinoStringOptions(
query_string=query_str,
)
)
return exec_trino_query_string_task
default_args = {
"owner": DagEnum.DE_OWNER.value,
"depends_on_past": False,
"start_date": datetime(2022, 10, 6, tzinfo=KST),
"email_on_failure": False,
"email_on_retry": False,
"execution_timeout": timedelta(hours=int(DagEnum.EXECUTION_TIMEOUT_HOUR_MAX.value)),
"retries": int(DagEnum.RETRIES_MID.value),
# "on_failure_callback": WatchTower().fail,
"retry_delay": timedelta(minutes=int(DagEnum.RETRY_DELAY_MIN.value)),
}
with DAG(
dag_id=os.path.basename(__file__).replace(".py", ""),
default_args=default_args,
schedule_interval="*/10 * * * *",
user_defined_macros={
"execution_date_fmt": execution_date_fmt,
},
concurrency=int(DagEnum.CONCURRENCY_MID.value),
max_active_runs=int(DagEnum.MAX_ACTIVE_RUNS_MIN.value),
catchup=False,
tags=[DatahubEnum.TAG_TRINO.value, DatahubEnum.TAG_SPARK.value, DatahubEnum.TAG_DISTCP.value],
) as dag:
# dummy task
start_task = CustomDummyOperator(task_id="start", dag=dag)
end_task = CustomDummyOperator(task_id="end", dag=dag)
# tasks
# start_task >> example_task_kubernetes(dag) >> end_task
start_task >> example_task_celery(dag) >> end_task
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org