You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/20 19:56:32 UTC

[GitHub] [airflow] leahecole opened a new issue, #23129: BigQueryInsertJobOperator fails when there are templated variables in nested dictionaries

leahecole opened a new issue, #23129:
URL: https://github.com/apache/airflow/issues/23129

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-google==6.8.0
   
   ### Apache Airflow version
   
   2.2.3
   
   ### Operating System
   
   n/a
   
   ### Deployment
   
   Composer
   
   ### Deployment details
   
   Hi! I'm using composer-2.0.6-airflow-2.2.3 - it's a Public IP environment without any configuration overrides. This is a super basic sandbox environment I use for testing.
   
   ### What happened
   
   I was experimenting with the BigQueryInsertJobOperator and had a failure when I tried to utilize Airflow variables within a Job configuration. 
   **Error**
   ```
   google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/%7B%7Bvar.value.gcp_project%7D%7D/jobs?prettyPrint=false: Invalid project ID '{{var.value.gcp_project}}'. Project IDs must contain 6-63 lowercase letters, digits, or dashes. Some project IDs also include domain name separated by a colon. IDs must start with a letter and may not end with a dash.
   ```
   
   **DAG pseudocode** 
   (I copy pasted the relevant bits of my DAG)
   - `BQ_DESTINATION_TABLE_NAME` and `BQ_DESTINATION_DATASET_NAME` are strings, not Airflow variables, so they're doing great. `WEATHER_HOLIDAYS_JOIN_QUERY` is a SQL query also defined as a string and as far as I can tell is also doing great. `PROJECT_NAME` is using a templated Airflow variable that is defined and is successfully being used in other operators in this and other DAGs.
   
   ```python
   PROJECT_NAME = '{{var.value.gcp_project}}'
    bq_join_holidays_weather_data = bigquery.BigQueryInsertJobOperator(
           task_id="bq_join_holidays_weather_data",
           configuration={
               "query": {
                   "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                   "useLegacySql": False,
                   "destinationTable": {
                           "projectId": PROJECT_NAME,
                           "datasetId": BQ_DESTINATION_DATASET_NAME,
                           "tableId": BQ_DESTINATION_TABLE_NAME
                       }
               }
           },
           location="US", 
       )
   ```
   
   **Some things I tried/researched**
   I experimented a little bit with adding `"configuration.query.destination_table": "json"` to [this line](https://github.com/apache/airflow/blob/4fa718e4db2daeb89085ea20e8b3ce0c895e415c/airflow/providers/google/cloud/operators/bigquery.py#L2085) but did not have success. Additionally, I checked out the [DataprocSubmitJobOperator](https://github.com/apache/airflow/blob/4fa718e4db2daeb89085ea20e8b3ce0c895e415c/airflow/providers/google/cloud/operators/dataproc.py#L1799) to see if I could find some clues because I know Dataproc configurations also often have many nested dictionaries and I'm like 90% certain I've templated values there. I had to timebox this though because I do have a workaround (just not using the Airflow variable) and I thought I'd open an issue to see if someone who is more familiar with the underlying template rendering might be able to more easily decipher what's happening
   
   ### What you think should happen instead
   
   I think that I should be allowed to use an Airflow variable here 😁 
   
   ### How to reproduce
   
   Run a `query` job using the `BigQueryInsertJobOperator` that writes the query to a destination table with a fully qualified [`TableReference` object](https://cloud.google.com/bigquery/docs/reference/rest/v2/TableReference) and pass in the `projectId` parameter as a templated Airflow variable
   
   ### Anything else
   
   I am willing to submit a PR, but if someone else also wants to, they might be faster than I will, especially between now and the summit
   
   Also, it's been awhile since I submitted an issue and this form is INCREDIBLE well done friends
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] leahecole commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args

Posted by GitBox <gi...@apache.org>.
leahecole commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1121281463

   > Yeah. The default_args should not containt tamplateable I think - I think maybe we could improve it in the future, but I think it should not be needed. Maybe a doc update is needed that you should not use templates in default_args ?
   
   Makes sense. I've been trying to use templated variables everywhere `models.Variables.get`  because I know that once upon a time (not sure if still true) itw as not great to have `models.Variable.get` outside of the DAG object because it would make the call to the DB at parse time to populate it. To make code pretty, I just did jinja templating everywhere, but, technically, `default_args` is part of the DAG object right? So I don't need to worry about that issue in this specific case?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args

Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1156646825

   > I think it would be possible to answer all those questions but I think currently the answer is " default_args are not processed by JINJA"
   
   I don't think this is true. `default_args` just "unpack" at operator contractor - this is done while parsing the dag not during run time.
   
   I used this code:
   ```
   
   import pendulum
   
   from airflow import DAG
   from airflow.operators.bash import BashOperator
   
   default_args = {
       "bash_command": "echo {{ ds }}"
   }
   
   with DAG(
       dag_id='testing_default_args',
       schedule_interval='0 0 * * *',
       start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
       catchup=False,
       default_args=default_args
   ) as dag:
   
       run_this = BashOperator(
           task_id='run_after_loop',
       )
   ```
   
   
   and it worked fine:
   ![Screen Shot 2022-06-15 at 18 50 07](https://user-images.githubusercontent.com/45845474/173870949-be123d08-cddb-4a59-9b57-8ef3ca3053cf.png)
   
   
   
   @leahecole in your code example you try to template `region`, `project_id` in BigQueryInsertJobOperator but both are not templated fields:
   
   https://github.com/apache/airflow/blob/8e0bddaea69db4d175f03fa99951f6d82acee84d/airflow/providers/google/cloud/operators/bigquery.py#L2070-L2074
   
   
   Can you please try it with:
   
   ```
   class MyBigQueryInsertJobOperator(BigQueryInsertJobOperator):
       template_fields: Sequence[str] = (
           "region",
           "project_id",
       ) + BigQueryInsertJobOperator.template_fields
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] leahecole commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args

Posted by GitBox <gi...@apache.org>.
leahecole commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1156526214

   Sorry for late response!! I will be checking this out with my intern this summer.
   
   > I think it would be possible to answer all those questions but I think currently the answer is " default_args are not processed by JINJA" (and I am guessing here - I am not 100% sure) :)
   
   Re this ^ that definitely makes sense given the scope. I'll make a docs update about templating + default args just in case anyone else tries this 😁 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] leahecole commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args

Posted by GitBox <gi...@apache.org>.
leahecole commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1171575446

   Will do! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1121376868

   Also @leahecole TIL - while answering someone's question.
   
   I think you COULD use user-defined macros to achieve what you want: user_defined_macros at DAG level
   
   ```
   user_defined_macros = { "project_id" : PROJECT_ID }
   
   ....
   
   
   task_1( project_id = '{{ project_id }}')
   
   ...
   task_2( project_id = '{{ project_id }}')
   
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1121316797

   > I've been trying to use templated variables everywhere models.Variables.get because I know that once upon a time (not sure if still true) itw as not great to have models.Variable.get outside of the DAG object because it would make the call to the DB at parse time to populate it
   
   It's still the case.
   
   > technically, default_args is part of the DAG object right? So I don't need to worry about that issue in this specific case?
   
   Technically yes, but I think it's not processed by template engine :). Maybe it could but I think it's not and this is the problem. The problem with it is that template engine processes the parameter with JINJA separately for each task and the jinja templated-args are "per task". So processing them with JINJA just before execute() makes sense.
   
   This is a bit different with default args - because they are "per dag" not "per task". While we could agree that this means that tha a copy of each of the default_arg is separately processed by each task, it's not at all obvious because the scope of the default_args is "per dag" rather than "per task".  So one could argue - should the default_arg be processed once per dag? or for every task? should we process them also in the tasks that do not use them?
   
   Also I belive the default args can be aplied at TaskGroup level - which makes it even broader scope :).
   
   I think it would be possible to answer all those questions but I think currently the answer is " default_args are not processed by JINJA" (and I am guessing here - I am not 100% sure) :)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] leahecole commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in nested dictionaries

Posted by GitBox <gi...@apache.org>.
leahecole commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1113495083

   UPDATE: It's not the nested dictionary - it's the project name being passed in as the default arg


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] leahecole commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in nested dictionaries

Posted by GitBox <gi...@apache.org>.
leahecole commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1108934284

   Hi, yep! Here's a complete DAG
   ```python
   
   import datetime
   
   from airflow import models
   from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
   
   
   
   PROJECT_NAME = '{{var.value.gcp_project}}'
   
   BQ_DATASET_NAME="bigquery-public-data.ghcn_d.ghcnd_2021"
   BQ_DESTINATION_DATASET_NAME="holiday_weather"
   BQ_DESTINATION_TABLE_NAME="holidays_weather_joined"
   
   
   WEATHER_HOLIDAYS_JOIN_QUERY = f"""
   SELECT Holidays.Date, Holiday, id, element, value
   FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
   JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table WHERE Table.element="TMAX" AND Table.id LIKE "US%") AS Weather
   ON Holidays.Date = Weather.Date;
   """
   
   yesterday = datetime.datetime.combine(
       datetime.datetime.today() - datetime.timedelta(1),
       datetime.datetime.min.time())
   
   default_dag_args = {
       'start_date': yesterday,
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 1,
       'retry_delay': datetime.timedelta(minutes=5),
       'project_id': PROJECT_NAME,
       'region': '{{ var.value.gce_region }}',
   
   }
   
   with models.DAG(
           'example_bug',
           # Continue to run DAG once per day
           schedule_interval=datetime.timedelta(days=1),
           default_args=default_dag_args) as dag:
   
       bq_join_holidays_weather_data = BigQueryInsertJobOperator(
           task_id="bq_join_holidays_weather_data",
           configuration={
               "query": {
                   "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                   "useLegacySql": False,
                   "destinationTable": {
                           "projectId": PROJECT_NAME,
                           "datasetId": BQ_DESTINATION_DATASET_NAME,
                           "tableId": BQ_DESTINATION_TABLE_NAME
                       }
               }
           },
           location="US", 
       )
       bq_join_holidays_weather_data`
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args

Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1171518002

   Sure we can do that. will you start a PR?
   In the meantime I'm closing the issue as there is no bug to fix :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal closed issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args

Posted by GitBox <gi...@apache.org>.
eladkal closed issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args
URL: https://github.com/apache/airflow/issues/23129


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1120486577

   Yeah. The default_args should not containt tamplateable I thionk - I think maybe we coud improve it in the fufure, bit I think it should not be nede. Maybe a doc update is needed that you should not use templates in default_args ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] leahecole commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args

Posted by GitBox <gi...@apache.org>.
leahecole commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1171467580

   Good point @eladkal - it looks like in other BigQuery operators we DO template project ID (for example: https://github.com/apache/airflow/blob/8e0bddaea69db4d175f03fa99951f6d82acee84d/airflow/providers/google/cloud/operators/bigquery.py#L790-L797). I tried your suggestion, and it does work (I did location instead of region, which is what BQ expects). Would it make sense to template project_id for all of the BQ operators?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tirkarthi commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in nested dictionaries

Posted by GitBox <gi...@apache.org>.
tirkarthi commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1108203063

   The below test case works where I am passing a variable "gcp_project" whose value is rendered and passed properly during task instance run. Can you please add a full script or a complete dag example to reproduce this?
   
   The handling of dictionary is done here where the templates are rendered recursively in case of a dictionary : 
   
   https://github.com/apache/airflow/blob/e9f9d33b57220347e0171931ba251368e1d5ee35/airflow/models/abstractoperator.py#L403-L404
   
   ```python
   @mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5')
   @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
   def test_execute_params(self, mock_hook, mock_md5, create_task_instance_of_operator):
       Variable.set(key="gcp_project", value="test_gcp_project")
       job_id = "123456"
       hash_ = "hash"
       real_job_id = f"{job_id}_{hash_}"
       mock_md5.return_value.hexdigest.return_value = hash_
   
       configuration = {
           "query": {
               "query": "SELECT 1",
           },
           "destinationTable": {
               "projectId": '{{var.value.gcp_project}}',
           }
       }
   
       rendered_configuration = {
           "query": {
               "query": "SELECT 1",
           },
           "destinationTable": {
               "projectId": 'test_gcp_project',
           }
       }
   
       mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)
   
       ti = create_task_instance_of_operator(
           BigQueryInsertJobOperator,
           dag_id=TEST_DAG_ID,
           task_id=TASK_ID,
           configuration=configuration,
           location=TEST_DATASET_LOCATION,
           project_id=TEST_GCP_PROJECT_ID,
           job_id=job_id
       )
       ti.run()
   
       mock_hook.return_value.insert_job.assert_called_once_with(
           configuration=rendered_configuration,
           location=TEST_DATASET_LOCATION,
           job_id=real_job_id,
           project_id=TEST_GCP_PROJECT_ID,
           retry=DEFAULT_RETRY,
           timeout=None,
       )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org