You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/29 03:05:35 UTC

[GitHub] [airflow] patricker opened a new pull request, #26768: Fix MaxID logic for GCSToBigQueryOperator

patricker opened a new pull request, #26768:
URL: https://github.com/apache/airflow/pull/26768

   related: #26283 
   closes: #26767
   
   The `max_id_key ` parameter, when used, causes an XCom serialization failure when trying to retrieve the value back out of XCom. This is because instead of storing a single column value in XCom, we were accidentally storing the entire Row.
   
   The Unit Test was updated to reflect the return type of `get_job().result()`. This operation actually returns a Row iterator, but returning an array of `Row` works well for the test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26768: Fix MaxID logic for GCSToBigQueryOperator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26768:
URL: https://github.com/apache/airflow/pull/26768#discussion_r983022673


##########
airflow/providers/google/cloud/transfers/gcs_to_bigquery.py:
##########
@@ -319,9 +319,10 @@ def execute(self, context: Context):
                     location=self.location,
                     use_legacy_sql=False,
                 )
-            row = list(bq_hook.get_job(job_id=job_id, location=self.location).result())
-            if row:
-                max_id = row[0] if row[0] else 0
+            result = list(bq_hook.get_job(job_id=job_id, location=self.location).result())
+            if result and len(result) > 0:
+                row = result[0]

Review Comment:
   ```suggestion
               result = bq_hook.get_job(job_id=job_id, location=self.location).result()
               row = next(iter(result), None)
               if row is not None:
   ```
   
   This should be more efficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bhirsz commented on a diff in pull request #26768: Fix MaxID logic for GCSToBigQueryOperator

Posted by GitBox <gi...@apache.org>.
bhirsz commented on code in PR #26768:
URL: https://github.com/apache/airflow/pull/26768#discussion_r983170056


##########
airflow/providers/google/cloud/transfers/gcs_to_bigquery.py:
##########
@@ -319,9 +319,10 @@ def execute(self, context: Context):
                     location=self.location,
                     use_legacy_sql=False,
                 )
-            row = list(bq_hook.get_job(job_id=job_id, location=self.location).result())
-            if row:
-                max_id = row[0] if row[0] else 0
+            result = bq_hook.get_job(job_id=job_id, location=self.location).result()
+            row = next(iter(result), None)
+            if row is not None:

Review Comment:
   nit: You can reverse if for smaller indent:
   ```
   if row is None:
       raise RuntimeError(f"The {select_command} returned no rows!")
   max_id = row[0]
   self.log.info(
       'Loaded BQ data with max %s.%s=%s',
       self.destination_project_dataset_table,
       self.max_id_key,
       max_id,
   )
   return max_id
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #26768: Fix MaxID logic for GCSToBigQueryOperator

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #26768:
URL: https://github.com/apache/airflow/pull/26768


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] patricker commented on a diff in pull request #26768: Fix MaxID logic for GCSToBigQueryOperator

Posted by GitBox <gi...@apache.org>.
patricker commented on code in PR #26768:
URL: https://github.com/apache/airflow/pull/26768#discussion_r983803341


##########
airflow/providers/google/cloud/transfers/gcs_to_bigquery.py:
##########
@@ -319,9 +319,10 @@ def execute(self, context: Context):
                     location=self.location,
                     use_legacy_sql=False,
                 )
-            row = list(bq_hook.get_job(job_id=job_id, location=self.location).result())
-            if row:
-                max_id = row[0] if row[0] else 0
+            result = bq_hook.get_job(job_id=job_id, location=self.location).result()
+            row = next(iter(result), None)
+            if row is not None:

Review Comment:
   Thanks! Applied.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] patricker commented on pull request #26768: Fix MaxID logic for GCSToBigQueryOperator

Posted by GitBox <gi...@apache.org>.
patricker commented on PR #26768:
URL: https://github.com/apache/airflow/pull/26768#issuecomment-1261692687

   @uranusjr If you have access to GCP/BigQuery, this was the quick/simple script I used to test out all the logic first. https://cloud.google.com/bigquery/docs/samples/bigquery-query-results-dataframe


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] patricker commented on a diff in pull request #26768: Fix MaxID logic for GCSToBigQueryOperator

Posted by GitBox <gi...@apache.org>.
patricker commented on code in PR #26768:
URL: https://github.com/apache/airflow/pull/26768#discussion_r983023477


##########
airflow/providers/google/cloud/transfers/gcs_to_bigquery.py:
##########
@@ -319,9 +319,10 @@ def execute(self, context: Context):
                     location=self.location,
                     use_legacy_sql=False,
                 )
-            row = list(bq_hook.get_job(job_id=job_id, location=self.location).result())
-            if row:
-                max_id = row[0] if row[0] else 0
+            result = list(bq_hook.get_job(job_id=job_id, location=self.location).result())
+            if result and len(result) > 0:
+                row = result[0]

Review Comment:
   Thanks! Looks good, and tested fine locally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bhirsz commented on a diff in pull request #26768: Fix MaxID logic for GCSToBigQueryOperator

Posted by GitBox <gi...@apache.org>.
bhirsz commented on code in PR #26768:
URL: https://github.com/apache/airflow/pull/26768#discussion_r983170056


##########
airflow/providers/google/cloud/transfers/gcs_to_bigquery.py:
##########
@@ -319,9 +319,10 @@ def execute(self, context: Context):
                     location=self.location,
                     use_legacy_sql=False,
                 )
-            row = list(bq_hook.get_job(job_id=job_id, location=self.location).result())
-            if row:
-                max_id = row[0] if row[0] else 0
+            result = bq_hook.get_job(job_id=job_id, location=self.location).result()
+            row = next(iter(result), None)
+            if row is not None:

Review Comment:
   You can reverse if for smaller indent:
   ```
   if row is None:
       raise RuntimeError(f"The {select_command} returned no rows!")
   max_id = row[0]
   self.log.info(
       'Loaded BQ data with max %s.%s=%s',
       self.destination_project_dataset_table,
       self.max_id_key,
       max_id,
   )
   return max_id
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org