You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/19 18:38:43 UTC

[GitHub] [airflow] potiuk commented on a change in pull request #8858: Refactor BigQuery operators

potiuk commented on a change in pull request #8858:
URL: https://github.com/apache/airflow/pull/8858#discussion_r427515977



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -942,6 +968,15 @@ def __init__(self,
         self.schema_object = schema_object
 
         # BQ config
+        if not table_resource:

Review comment:
       I agree. the behaviour is a bit undefined if wrong combination of  parameters is specified. If table_resources is undefined and schema_fields are defined and source_objects and bucket  are, I am not sure what happens here. I think all the "superfluous" combinations where same data is provided by different means  should result in exception

##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -1222,22 +1265,21 @@ class BigQueryGetDatasetTablesOperator(BaseOperator):
         For this to work, the service account making the request must have domain-wide
         delegation enabled.
     :type delegate_to: str
-
-    :rtype: dict
-        .. seealso:: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list#response-body
     """
     template_fields = ('dataset_id', 'project_id')
     ui_color = BigQueryUIColors.DATASET.value
 
     @apply_defaults
-    def __init__(self,
-                 dataset_id: str,
-                 project_id: Optional[str] = None,
-                 max_results: Optional[int] = None,
-                 page_token: Optional[str] = None,
-                 gcp_conn_id: Optional[str] = 'google_cloud_default',
-                 delegate_to: Optional[str] = None,
-                 *args, **kwargs) -> None:
+    def __init__(
+        self,
+        dataset_id: str,
+        project_id: Optional[str] = None,
+        max_results: Optional[int] = None,
+        page_token: Optional[str] = None,

Review comment:
       Agree. We should iterate over all pages here

##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -942,6 +968,15 @@ def __init__(self,
         self.schema_object = schema_object
 
         # BQ config
+        if not table_resource:
+            warnings.warn(
+                "Passing table parameters via key words arguments will be deprecated. "

Review comment:
       By when will it be deprecated ?

##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -963,24 +998,35 @@ def __init__(self,
         self.location = location
 
     def execute(self, context):
-        bq_hook = BigQueryHook(gcp_conn_id=self.bigquery_conn_id,
-                               delegate_to=self.delegate_to,
-                               location=self.location)
+        bq_hook = BigQueryHook(
+            gcp_conn_id=self.bigquery_conn_id,
+            delegate_to=self.delegate_to,
+            location=self.location
+        )
 
         if not self.schema_fields and self.schema_object and self.source_format != 'DATASTORE_BACKUP':
             gcs_hook = GCSHook(
                 google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
                 delegate_to=self.delegate_to)
-            schema_fields = json.loads(gcs_hook.download(
-                self.bucket,
-                self.schema_object).decode("utf-8"))
+            schema_object = gcs_hook.download(self.bucket, self.schema_object)
+            schema_fields = json.loads(schema_object.decode("utf-8"))
         else:
             schema_fields = self.schema_fields
 
-        source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
-                       for source_object in self.source_objects]
+        if schema_fields and self.table_resource:
+            self.table_resource["externalDataConfiguration"]["schema"] = schema_fields
 
-        try:
+        source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects]

Review comment:
       I think this shold be moved to else: of the if below. Otherwise string formatting here will happen always, even if table_resources are present.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org