You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/06/13 18:55:42 UTC

[GitHub] [airflow] turbaszek commented on a diff in pull request #24416: Use insert_job in the BigQueryToGCPOpertor and adjust links

turbaszek commented on code in PR #24416:
URL: https://github.com/apache/airflow/pull/24416#discussion_r896036850


##########
airflow/providers/google/cloud/operators/bigquery.py:
##########
@@ -2167,16 +2156,34 @@ def execute(self, context: Any):
                     f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`"
                 )
 
-        if "query" in job.to_api_repr()["configuration"]:
-            if "destinationTable" in job.to_api_repr()["configuration"]["query"]:
-                table = job.to_api_repr()["configuration"]["query"]["destinationTable"]
-                BigQueryTableLink.persist(
-                    context=context,
-                    task_instance=self,
-                    dataset_id=table["datasetId"],
-                    project_id=table["projectId"],
-                    table_id=table["tableId"],
-                )
+        job_types = {
+            LoadJob._JOB_TYPE: ["sourceTable", "destinationTable"],
+            CopyJob._JOB_TYPE: ["sourceTable", "destinationTable"],
+            ExtractJob._JOB_TYPE: ["sourceTable"],
+            QueryJob._JOB_TYPE: ["destinationTable"],
+        }
+
+        for job_type, tables_prop in job_types.items():
+            if job_type in job.to_api_repr()["configuration"]:
+                for table_prop in tables_prop:
+                    if table_prop in job.to_api_repr()["configuration"][job_type]:
+                        table = job.to_api_repr()["configuration"][job_type][table_prop]
+                        if self.project_id:
+                            if isinstance(table, str):
+                                BigQueryTableLink.persist(
+                                    context=context,
+                                    task_instance=self,
+                                    project_id=self.project_id,
+                                    table_id=table,
+                                )
+                            else:
+                                BigQueryTableLink.persist(
+                                    context=context,
+                                    task_instance=self,
+                                    dataset_id=table["datasetId"],
+                                    project_id=self.project_id,
+                                    table_id=table["tableId"],
+                                )

Review Comment:
   Consider using opossite assertion and `continue` to keep the code clean. We don't have pylint but too much indentation is not good 😉 
   
   Also `if not self.project_id:` can be moved outside loop as it's crucial, so if missing then no need to do any work, right?
   
   Apart from that we can introduce `job_configuration`
   
   ```suggestion
               job_configuration =  job.to_api_repr()["configuration"]
               if not job_type in job_configuration:
                   continue
               for table_prop in tables_prop:
                   if not table_prop in job_configuration[job_type]:
                       continue
                   table = job_configuration[job_type][table_prop]
   
                   if isinstance(table, str):
                       BigQueryTableLink.persist(
                           context=context,
                           task_instance=self,
                           project_id=self.project_id,
                           table_id=table,
                       )
                   else:
                       BigQueryTableLink.persist(
                           context=context,
                           task_instance=self,
                           dataset_id=table["datasetId"],
                           project_id=self.project_id,
                           table_id=table["tableId"],
                       )
   ```
   
   Additionally I would do:
   ```
   persist_kwargs = {"context": context, task_instance: self, project_id: self.project_id, table_id: table}
   if not isinstance(table, str):
       persist_kwargs["table_id"] = table["tableId"]
       persist_kwargs["dataset_id"] = table["datasetId"]
   
   BigQueryTableLink.persist(**kwargs)
   ```
   instead of calling `BigQueryTableLink.persist` twice.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org