You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Conrad Lee (JIRA)" <ji...@apache.org> on 2019/01/03 09:23:00 UTC

[jira] [Comment Edited] (AIRFLOW-3316) GCS to BQ operator leaves schema_fields operator unset when autodetect=True

    [ https://issues.apache.org/jira/browse/AIRFLOW-3316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732794#comment-16732794 ] 

Conrad Lee edited comment on AIRFLOW-3316 at 1/3/19 9:22 AM:
-------------------------------------------------------------

[~jackjack10] here is the source code in my dag for the task that fails:

 
{code:java}
pageviews_loader = GoogleCloudStorageToBigQueryOperatorUpdated(
  task_id='pageviews_loader',
  bucket='my-bucket',
  source_objects=['/year={{ execution_date.year }}/day={{ ds }}/subfolder/*'],
  destination_project_dataset_table='my_dataset.my_table',
  source_format='PARQUET',
  write_disposition='WRITE_APPEND',
  create_disposition='CREATE_IF_NEEDED',
  time_partitioning={'type':'DAY', 'field':'timestamp'},
  cluster_fields=['visitor_partition', 'company'],
  autodetect=True)

{code}
Note that autodetect is set to True and I don't set schema_fields.

The issue can be readily confirmed via inspection of the source code [here|[https://github.com/apache/incubator-airflow/blob/00b09b8520885cdc1dc5e46d0a5df148e3abb534/airflow/contrib/operators/gcs_to_bq.py#L200-L210]].  Because schema_fields is set to None, the execution path enters the first outer 'if' statement on line 200.

The condition in the if clause on line 200 evaluates to False, and the execution path continues on to the elif clause on line 208.  The condition there also evaluates to False, so the schema_fields variable is never initiated.

You pointed out that
{noformat}
schema_fields is optional field. You don't need to assign None. If there is no schema then don't specify this field.{noformat}
as you see, I don't specify this field.  The self.schema_fields is set to None as a result, but the issue isn't with the self.schema_fields variable, but rather with the schema_fields variable (it's a bit confusing that we have two variables named so similarly).

 


was (Author: conradlee):
[~jackjack10] here is the source code in my dag for the task that fails:

 
{code:java}
pageviews_loader = GoogleCloudStorageToBigQueryOperatorUpdated(
  task_id='pageviews_loader',
  bucket='my-bucket',
  source_objects=['/year={{ execution_date.year }}/day={{ ds }}/subfolder/*'],
  destination_project_dataset_table='my_dataset.my_table',
  source_format='PARQUET',
  write_disposition='WRITE_APPEND',
  create_disposition='CREATE_IF_NEEDED',
  time_partitioning={'type':'DAY', 'field':'timestamp'},
  cluster_fields=['visitor_partition', 'company'],
  autodetect=True)

{code}
Note that autodetect is set to True and I don't set schema_fields.

The issue can be readily confirmed via inspection of the source code [here|[https://github.com/apache/incubator-airflow/blob/00b09b8520885cdc1dc5e46d0a5df148e3abb534/airflow/contrib/operators/gcs_to_bq.py#L200-L210].]  Because schema_fields is set to None, the execution path enters the first outer 'if' statement on line 200.

The condition in the if clause on line 200 evaluates to False, and the execution path continues on to the elif clause on line 208.  The condition there also evaluates to False, so the schema_fields variable is never initiated.

You pointed out that
{noformat}
schema_fields is optional field. You don't need to assign None. If there is no schema then don't specify this field.{noformat}
as you see, I don't specify this field.  The self.schema_fields is set to None as a result, but the issue isn't with the self.schema_fields variable, but rather with the schema_fields variable (it's a bit confusing that we have two variables named so similarly).

 

> GCS to BQ operator leaves schema_fields operator unset when autodetect=True
> ---------------------------------------------------------------------------
>
>                 Key: AIRFLOW-3316
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3316
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: operators
>    Affects Versions: 1.10.1
>            Reporter: Conrad Lee
>            Assignee: Conrad Lee
>            Priority: Minor
>
> When I use the GoogleCloudStorageToBigQueryOperator to load data from Parquet into BigQuery, I leave the schema_fields argument set to 'None' and set autodetect=True.
>  
> This causes the following error: 
>  
> {code:java}
> [2018-11-08 09:42:03,690] {models.py:1736} ERROR - local variable 'schema_fields' referenced before assignment
> Traceback (most recent call last)
>   File "/usr/local/lib/airflow/airflow/models.py", line 1633, in _run_raw_tas
>     result = task_copy.execute(context=context
>   File "/home/airflow/gcs/plugins/bq_operator_updated.py", line 2018, in execut
>     schema_fields=schema_fields
> UnboundLocalError: local variable 'schema_fields' referenced before assignmen
> {code}
>  
> The problem is this set of checks in which the schema_fields variable is set neglects to cover all the cases
> {code:java}
> if not self.schema_fields:
>   if self.schema_object and self.source_format != 'DATASTORE_BACKUP':
>     gcs_hook = GoogleCloudStorageHook(
>         google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, 
>         delegate_to=self.delegate_to)
>     schema_fields = json.loads(gcs_hook.download(
>       self.bucket,
>       self.schema_object).decode("utf-8"))
>   elif self.schema_object is None and self.autodetect is False:
>     raise ValueError('At least one of `schema_fields`, `schema_object`, '
>     'or `autodetect` must be passed.')
> else:
>     schema_fields = self.schema_fields
> {code}
> After the `elif` we need to handle the case where autodetect is set to True.  This can be done by simply adding two lines:
> {code:java}
> if not self.schema_fields:
>   if self.schema_object and self.source_format != 'DATASTORE_BACKUP':
>     gcs_hook = GoogleCloudStorageHook(
>         google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, 
>         delegate_to=self.delegate_to)
>     schema_fields = json.loads(gcs_hook.download(
>       self.bucket,
>       self.schema_object).decode("utf-8"))
>   elif self.schema_object is None and self.autodetect is False:
>     raise ValueError('At least one of `schema_fields`, `schema_object`, '
>     'or `autodetect` must be passed.')
>   else:
>     schema_fiels = None
> else:
>     schema_fields = self.schema_fields{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)