You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/21 17:31:17 UTC

[GitHub] [airflow] Syar123 commented on issue #21714: Unable to pass parameters which aren’t strings while triggering external data in airflow 2.1.4

Syar123 commented on issue #21714:
URL: https://github.com/apache/airflow/issues/21714#issuecomment-1047101944


   def trigger_gcs_to_bq_dag(context, dag_run_obj):        
           dag_run_obj.payload = {
               "gcs_bucket" : context['ti'].xcom_pull(key='gcs_bucket', task_ids='parse_params'),"gcs_objects_list" : context['ti'].xcom_pull(key='gcs_objects_list', task_ids='filter_gcs_files'),"bq_auto_detect_schema" : context['ti'].xcom_pull(key='bq_auto_detect_schema', task_ids='parse_params'),"bq_schema" : context['ti'].xcom_pull(key='bq_schema', task_ids='parse_params')
           }
           return dag_run_obj
   
   
       trigger_gcs_to_bq_dag = TriggerDagRunOperator(
           task_id='trigger_gcs_to_bq_dag',
           trigger_dag_id="gcs_to_bq",  
           python_callable=trigger_gcs_to_bq_dag, 
           retries=1
           )        
   
   
   in airflow 2.1.4 as python_callable is deprecated we needed  to pass parameters through 'conf' under TriggerDagRunOperator instead of  dag_run_obj.payload
   
   trigger_gcs_to_bq_dag = TriggerDagRunOperator(
           task_id='trigger_gcs_to_bq_dag',
           trigger_dag_id="gcs_to_bq",  
           conf={"gcs_bucket" : "{{ ti.xcom_pull(key='gcs_bucket', task_ids='parse_params') }}","gcs_objects_list" : "{{ ti.xcom_pull(key='gcs_objects_list', task_ids='filter_gcs_files') }}","bq_auto_detect_schema" : "{{ ti.xcom_pull(key='bq_auto_detect_schema', task_ids='parse_params') }}","bq_schema" : "{{ xcom_pull(key='bq_schema', task_ids='parse_params') }}" },
           retries=1
       )
   
   when passing through conf the parameter values are going as strings instead of their own values (None is passing as 'None')
   
   in the external dag ( gcs_to_bq) we are using logic like
   
    def parse_params(ti, **context):
               if context['dag_run'].conf:
                   params['gcs_bucket'] = context['dag_run'].conf.get('gcs_bucket', None)
                   params['gcs_objects_list'] = context['dag_run'].conf.get('gcs_objects_list', None)
                   params['bq_auto_detect_schema'] = context['dag_run'].conf.get('bq_auto_detect_schema', None)
                   params['bq_schema'] = context['dag_run'].conf.get('bq_schema', None)
               else: # fail without retry
                   raise AirflowFailException("config missing. please trigger the dag with proper config")
   
               
               # set defaults:
               if (params['bq_auto_detect_schema'] is None and params['bq_schema'] is None):
                   params['bq_auto_detect_schema'] = True 
               
               if params['bq_schema'] is not None:
                   schema_fields=[
                       {
                           'name':f[0],
                           'type':f[1]
                       } for f in params['bq_schema']]
   
   
   
   in our external dag (gcs_to_bq) [ as parsing parameters are strings we are having issues while reading values like None, integers (0,1) and boolean values (true)]
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org