You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "takersk (via GitHub)" <gi...@apache.org> on 2023/02/20 00:44:38 UTC

[GitHub] [airflow] takersk commented on issue #29561: CeleryKubernetesExecutor @task(queue="kubernetes")  not work

takersk commented on issue #29561:
URL: https://github.com/apache/airflow/issues/29561#issuecomment-1436153289

   @hussein-awala 
   
   yes!!
   
   ```
   import os
   from datetime import datetime, timedelta
   
   from airflow import DAG
   
   from common.enum.enum import DagEnum, DatahubEnum
   from common.operator.custom_dummy_operator import CustomDummyOperator
   from common.options.trino_options import TrinoStringOptions
   from common.utils.date import KST
   from common.utils.date import (
       execution_date_fmt,
   )
   from common.utils.pod import (
       exec_trino_query_pod,
   )
   from common.utils.watchtower import WatchTower
   from airflow.decorators import task
   
   
   @task(queue="kubernetes")
   def example_task_kubernetes(_dag):
       query_str = "select * from kudu.dw.dim_gift_brand limit 10"
       exec_trino_query_string_task = exec_trino_query_pod(
           dag=_dag,
           task_id="example_task_kubernetes",
           trino_options=TrinoStringOptions(
               query_string=query_str,
           )
       )
   
       return exec_trino_query_string_task
   
   
   def example_task_celery(_dag):
       query_str = "select * from kudu.dw.dim_gift_brand limit 10"
       exec_trino_query_string_task = exec_trino_query_pod(
           dag=_dag,
           task_id="example_task_celery",
           trino_options=TrinoStringOptions(
               query_string=query_str,
           )
       )
   
       return exec_trino_query_string_task
   
   
   default_args = {
       "owner": DagEnum.DE_OWNER.value,
       "depends_on_past": False,
       "start_date": datetime(2022, 10, 6, tzinfo=KST),
       "email_on_failure": False,
       "email_on_retry": False,
       "execution_timeout": timedelta(hours=int(DagEnum.EXECUTION_TIMEOUT_HOUR_MAX.value)),
       "retries": int(DagEnum.RETRIES_MID.value),
       # "on_failure_callback": WatchTower().fail,
       "retry_delay": timedelta(minutes=int(DagEnum.RETRY_DELAY_MIN.value)),
   }
   
   with DAG(
           dag_id=os.path.basename(__file__).replace(".py", ""),
           default_args=default_args,
           schedule_interval="*/10 * * * *",
           user_defined_macros={
               "execution_date_fmt": execution_date_fmt,
           },
           concurrency=int(DagEnum.CONCURRENCY_MID.value),
           max_active_runs=int(DagEnum.MAX_ACTIVE_RUNS_MIN.value),
           catchup=False,
           tags=[DatahubEnum.TAG_TRINO.value, DatahubEnum.TAG_SPARK.value, DatahubEnum.TAG_DISTCP.value],
   ) as dag:
       # dummy task
       start_task = CustomDummyOperator(task_id="start", dag=dag)
       end_task = CustomDummyOperator(task_id="end", dag=dag)
   
       # tasks
       # start_task >> example_task_kubernetes(dag) >> end_task
       start_task >> example_task_celery(dag) >> end_task
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org