You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/25 07:51:17 UTC

[GitHub] [airflow] tirkarthi commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in nested dictionaries

tirkarthi commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1108203063

   The below test case works where I am passing a variable "gcp_project" whose value is rendered and passed properly during task instance run. Can you please add a full script or a complete dag example to reproduce this?
   
   The handling of dictionary is done here where the templates are rendered recursively in case of a dictionary : 
   
   https://github.com/apache/airflow/blob/e9f9d33b57220347e0171931ba251368e1d5ee35/airflow/models/abstractoperator.py#L403-L404
   
   ```python
   @mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5')
   @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
   def test_execute_params(self, mock_hook, mock_md5, create_task_instance_of_operator):
       Variable.set(key="gcp_project", value="test_gcp_project")
       job_id = "123456"
       hash_ = "hash"
       real_job_id = f"{job_id}_{hash_}"
       mock_md5.return_value.hexdigest.return_value = hash_
   
       configuration = {
           "query": {
               "query": "SELECT 1",
           },
           "destinationTable": {
               "projectId": '{{var.value.gcp_project}}',
           }
       }
   
       rendered_configuration = {
           "query": {
               "query": "SELECT 1",
           },
           "destinationTable": {
               "projectId": 'test_gcp_project',
           }
       }
   
       mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)
   
       ti = create_task_instance_of_operator(
           BigQueryInsertJobOperator,
           dag_id=TEST_DAG_ID,
           task_id=TASK_ID,
           configuration=configuration,
           location=TEST_DATASET_LOCATION,
           project_id=TEST_GCP_PROJECT_ID,
           job_id=job_id
       )
       ti.run()
   
       mock_hook.return_value.insert_job.assert_called_once_with(
           configuration=rendered_configuration,
           location=TEST_DATASET_LOCATION,
           job_id=real_job_id,
           project_id=TEST_GCP_PROJECT_ID,
           retry=DEFAULT_RETRY,
           timeout=None,
       )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org