You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/20 19:56:32 UTC
[GitHub] [airflow] leahecole opened a new issue, #23129: BigQueryInsertJobOperator fails when there are templated variables in nested dictionaries
leahecole opened a new issue, #23129:
URL: https://github.com/apache/airflow/issues/23129
### Apache Airflow Provider(s)
google
### Versions of Apache Airflow Providers
apache-airflow-providers-google==6.8.0
### Apache Airflow version
2.2.3
### Operating System
n/a
### Deployment
Composer
### Deployment details
Hi! I'm using composer-2.0.6-airflow-2.2.3 - it's a Public IP environment without any configuration overrides. This is a super basic sandbox environment I use for testing.
### What happened
I was experimenting with the BigQueryInsertJobOperator and had a failure when I tried to utilize Airflow variables within a Job configuration.
**Error**
```
google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/%7B%7Bvar.value.gcp_project%7D%7D/jobs?prettyPrint=false: Invalid project ID '{{var.value.gcp_project}}'. Project IDs must contain 6-63 lowercase letters, digits, or dashes. Some project IDs also include domain name separated by a colon. IDs must start with a letter and may not end with a dash.
```
**DAG pseudocode**
(I copy pasted the relevant bits of my DAG)
- `BQ_DESTINATION_TABLE_NAME` and `BQ_DESTINATION_DATASET_NAME` are strings, not Airflow variables, so they're doing great. `WEATHER_HOLIDAYS_JOIN_QUERY` is a SQL query also defined as a string and as far as I can tell is also doing great. `PROJECT_NAME` is using a templated Airflow variable that is defined and is successfully being used in other operators in this and other DAGs.
```python
PROJECT_NAME = '{{var.value.gcp_project}}'
bq_join_holidays_weather_data = bigquery.BigQueryInsertJobOperator(
task_id="bq_join_holidays_weather_data",
configuration={
"query": {
"query": WEATHER_HOLIDAYS_JOIN_QUERY,
"useLegacySql": False,
"destinationTable": {
"projectId": PROJECT_NAME,
"datasetId": BQ_DESTINATION_DATASET_NAME,
"tableId": BQ_DESTINATION_TABLE_NAME
}
}
},
location="US",
)
```
**Some things I tried/researched**
I experimented a little bit with adding `"configuration.query.destination_table": "json"` to [this line](https://github.com/apache/airflow/blob/4fa718e4db2daeb89085ea20e8b3ce0c895e415c/airflow/providers/google/cloud/operators/bigquery.py#L2085) but did not have success. Additionally, I checked out the [DataprocSubmitJobOperator](https://github.com/apache/airflow/blob/4fa718e4db2daeb89085ea20e8b3ce0c895e415c/airflow/providers/google/cloud/operators/dataproc.py#L1799) to see if I could find some clues because I know Dataproc configurations also often have many nested dictionaries and I'm like 90% certain I've templated values there. I had to timebox this though because I do have a workaround (just not using the Airflow variable) and I thought I'd open an issue to see if someone who is more familiar with the underlying template rendering might be able to more easily decipher what's happening
### What you think should happen instead
I think that I should be allowed to use an Airflow variable here 😁
### How to reproduce
Run a `query` job using the `BigQueryInsertJobOperator` that writes the query to a destination table with a fully qualified [`TableReference` object](https://cloud.google.com/bigquery/docs/reference/rest/v2/TableReference) and pass in the `projectId` parameter as a templated Airflow variable
### Anything else
I am willing to submit a PR, but if someone else also wants to, they might be faster than I will, especially between now and the summit
Also, it's been awhile since I submitted an issue and this form is INCREDIBLE well done friends
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] leahecole commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args
Posted by GitBox <gi...@apache.org>.
leahecole commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1121281463
> Yeah. The default_args should not containt tamplateable I think - I think maybe we could improve it in the future, but I think it should not be needed. Maybe a doc update is needed that you should not use templates in default_args ?
Makes sense. I've been trying to use templated variables everywhere `models.Variables.get` because I know that once upon a time (not sure if still true) itw as not great to have `models.Variable.get` outside of the DAG object because it would make the call to the DB at parse time to populate it. To make code pretty, I just did jinja templating everywhere, but, technically, `default_args` is part of the DAG object right? So I don't need to worry about that issue in this specific case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] eladkal commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args
Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1156646825
> I think it would be possible to answer all those questions but I think currently the answer is " default_args are not processed by JINJA"
I don't think this is true. `default_args` just "unpack" at operator contractor - this is done while parsing the dag not during run time.
I used this code:
```
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
"bash_command": "echo {{ ds }}"
}
with DAG(
dag_id='testing_default_args',
schedule_interval='0 0 * * *',
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
default_args=default_args
) as dag:
run_this = BashOperator(
task_id='run_after_loop',
)
```
and it worked fine:
![Screen Shot 2022-06-15 at 18 50 07](https://user-images.githubusercontent.com/45845474/173870949-be123d08-cddb-4a59-9b57-8ef3ca3053cf.png)
@leahecole in your code example you try to template `region`, `project_id` in BigQueryInsertJobOperator but both are not templated fields:
https://github.com/apache/airflow/blob/8e0bddaea69db4d175f03fa99951f6d82acee84d/airflow/providers/google/cloud/operators/bigquery.py#L2070-L2074
Can you please try it with:
```
class MyBigQueryInsertJobOperator(BigQueryInsertJobOperator):
template_fields: Sequence[str] = (
"region",
"project_id",
) + BigQueryInsertJobOperator.template_fields
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] leahecole commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args
Posted by GitBox <gi...@apache.org>.
leahecole commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1156526214
Sorry for late response!! I will be checking this out with my intern this summer.
> I think it would be possible to answer all those questions but I think currently the answer is " default_args are not processed by JINJA" (and I am guessing here - I am not 100% sure) :)
Re this ^ that definitely makes sense given the scope. I'll make a docs update about templating + default args just in case anyone else tries this 😁
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] leahecole commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args
Posted by GitBox <gi...@apache.org>.
leahecole commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1171575446
Will do!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args
Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1121376868
Also @leahecole TIL - while answering someone's question.
I think you COULD use user-defined macros to achieve what you want: user_defined_macros at DAG level
```
user_defined_macros = { "project_id" : PROJECT_ID }
....
task_1( project_id = '{{ project_id }}')
...
task_2( project_id = '{{ project_id }}')
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args
Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1121316797
> I've been trying to use templated variables everywhere models.Variables.get because I know that once upon a time (not sure if still true) itw as not great to have models.Variable.get outside of the DAG object because it would make the call to the DB at parse time to populate it
It's still the case.
> technically, default_args is part of the DAG object right? So I don't need to worry about that issue in this specific case?
Technically yes, but I think it's not processed by template engine :). Maybe it could but I think it's not and this is the problem. The problem with it is that template engine processes the parameter with JINJA separately for each task and the jinja templated-args are "per task". So processing them with JINJA just before execute() makes sense.
This is a bit different with default args - because they are "per dag" not "per task". While we could agree that this means that tha a copy of each of the default_arg is separately processed by each task, it's not at all obvious because the scope of the default_args is "per dag" rather than "per task". So one could argue - should the default_arg be processed once per dag? or for every task? should we process them also in the tasks that do not use them?
Also I belive the default args can be aplied at TaskGroup level - which makes it even broader scope :).
I think it would be possible to answer all those questions but I think currently the answer is " default_args are not processed by JINJA" (and I am guessing here - I am not 100% sure) :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] leahecole commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in nested dictionaries
Posted by GitBox <gi...@apache.org>.
leahecole commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1113495083
UPDATE: It's not the nested dictionary - it's the project name being passed in as the default arg
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] leahecole commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in nested dictionaries
Posted by GitBox <gi...@apache.org>.
leahecole commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1108934284
Hi, yep! Here's a complete DAG
```python
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
PROJECT_NAME = '{{var.value.gcp_project}}'
BQ_DATASET_NAME="bigquery-public-data.ghcn_d.ghcnd_2021"
BQ_DESTINATION_DATASET_NAME="holiday_weather"
BQ_DESTINATION_TABLE_NAME="holidays_weather_joined"
WEATHER_HOLIDAYS_JOIN_QUERY = f"""
SELECT Holidays.Date, Holiday, id, element, value
FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table WHERE Table.element="TMAX" AND Table.id LIKE "US%") AS Weather
ON Holidays.Date = Weather.Date;
"""
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': PROJECT_NAME,
'region': '{{ var.value.gce_region }}',
}
with models.DAG(
'example_bug',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
bq_join_holidays_weather_data = BigQueryInsertJobOperator(
task_id="bq_join_holidays_weather_data",
configuration={
"query": {
"query": WEATHER_HOLIDAYS_JOIN_QUERY,
"useLegacySql": False,
"destinationTable": {
"projectId": PROJECT_NAME,
"datasetId": BQ_DESTINATION_DATASET_NAME,
"tableId": BQ_DESTINATION_TABLE_NAME
}
}
},
location="US",
)
bq_join_holidays_weather_data`
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] eladkal commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args
Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1171518002
Sure we can do that. will you start a PR?
In the meantime I'm closing the issue as there is no bug to fix :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] eladkal closed issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args
Posted by GitBox <gi...@apache.org>.
eladkal closed issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args
URL: https://github.com/apache/airflow/issues/23129
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args
Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1120486577
Yeah. The default_args should not containt tamplateable I thionk - I think maybe we coud improve it in the fufure, bit I think it should not be nede. Maybe a doc update is needed that you should not use templates in default_args ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] leahecole commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in default args
Posted by GitBox <gi...@apache.org>.
leahecole commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1171467580
Good point @eladkal - it looks like in other BigQuery operators we DO template project ID (for example: https://github.com/apache/airflow/blob/8e0bddaea69db4d175f03fa99951f6d82acee84d/airflow/providers/google/cloud/operators/bigquery.py#L790-L797). I tried your suggestion, and it does work (I did location instead of region, which is what BQ expects). Would it make sense to template project_id for all of the BQ operators?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] tirkarthi commented on issue #23129: BigQueryInsertJobOperator fails when there are templated variables in nested dictionaries
Posted by GitBox <gi...@apache.org>.
tirkarthi commented on issue #23129:
URL: https://github.com/apache/airflow/issues/23129#issuecomment-1108203063
The below test case works where I am passing a variable "gcp_project" whose value is rendered and passed properly during task instance run. Can you please add a full script or a complete dag example to reproduce this?
The handling of dictionary is done here where the templates are rendered recursively in case of a dictionary :
https://github.com/apache/airflow/blob/e9f9d33b57220347e0171931ba251368e1d5ee35/airflow/models/abstractoperator.py#L403-L404
```python
@mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5')
@mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook')
def test_execute_params(self, mock_hook, mock_md5, create_task_instance_of_operator):
Variable.set(key="gcp_project", value="test_gcp_project")
job_id = "123456"
hash_ = "hash"
real_job_id = f"{job_id}_{hash_}"
mock_md5.return_value.hexdigest.return_value = hash_
configuration = {
"query": {
"query": "SELECT 1",
},
"destinationTable": {
"projectId": '{{var.value.gcp_project}}',
}
}
rendered_configuration = {
"query": {
"query": "SELECT 1",
},
"destinationTable": {
"projectId": 'test_gcp_project',
}
}
mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)
ti = create_task_instance_of_operator(
BigQueryInsertJobOperator,
dag_id=TEST_DAG_ID,
task_id=TASK_ID,
configuration=configuration,
location=TEST_DATASET_LOCATION,
project_id=TEST_GCP_PROJECT_ID,
job_id=job_id
)
ti.run()
mock_hook.return_value.insert_job.assert_called_once_with(
configuration=rendered_configuration,
location=TEST_DATASET_LOCATION,
job_id=real_job_id,
project_id=TEST_GCP_PROJECT_ID,
retry=DEFAULT_RETRY,
timeout=None,
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org