You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/07 08:11:50 UTC

[GitHub] [airflow] tirkarthi commented on issue #22782: ExternalTaskSensor does not properly expand templates in external_task_id(s)

tirkarthi commented on issue #22782:
URL: https://github.com/apache/airflow/issues/22782#issuecomment-1091265551

   Even in `using_external_task_id` example the value is not rendered. `self.external_task_ids` is always logged. When `external_task_id` is passed in the constructor then `external_task_ids` is set as a list of the value. In this case the unrendered template value is set and logged in poke method. Regarding `external_task_ids` being made as a template field there are some validations done for `external_task_ids` in constructor where templates might not be useful and values are rendered during task runs.
   
   https://github.com/apache/airflow/blob/0592bfd85631ed3109d68c8ec9aa57f0465d90b3/airflow/sensors/external_task.py#L119-L120
   
   https://github.com/apache/airflow/blob/0592bfd85631ed3109d68c8ec9aa57f0465d90b3/airflow/sensors/external_task.py#L162-L167
   
   https://github.com/apache/airflow/blob/0592bfd85631ed3109d68c8ec9aa57f0465d90b3/airflow/sensors/external_task.py#L122-L129
   
   Test case for issue. It will hang without any output.
   
   ```python
       def test_external_task_sensor_params(self):                                                                                                                              
           self.test_time_sensor()                                                                                                                                              
           op = ExternalTaskSensor(                                                                                                                                             
               task_id='test_external_task_sensor_check',                                                                                                                       
               external_dag_id='{{ params.dag_id }}',                                                                                                                           
               external_task_id='{{ params.task_id }}',                                                                                                                         
               params={                                                                                                                                                         
                   'dag_id': TEST_DAG_ID,                                                                                                                                       
                   'task_id': TEST_TASK_ID,                                                                                                                                     
               },                                                                                                                                                               
               dag=self.dag,                                                                                                                                                    
           )                                                                                                                                                                    
                                                                                                                                                                                
           with self.assertLogs(op.log, level=logging.INFO) as cm:                                                                                                              
               op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)                                                                                 
               assert (                                                                                                                                                         
                   f"INFO:airflow.task.operators:Poking for tasks ['{TEST_TASK_ID}'] in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output                      
               ) 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org