You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/12 06:19:45 UTC

[GitHub] [airflow] pankajastro opened a new pull request, #22936: Type cast bigquery response

pankajastro opened a new pull request, #22936:
URL: https://github.com/apache/airflow/pull/22936

   Issue: 
   
   Currently, if I do not pass the `selected_fields` parameter in  BigQueryGetDataOperator which is equivalent to `select * from table` and bigquery table has a column having type Date, Time, Datetime or Decimal then the Python JSON module fails to serialise the response of this operator and the task fails.
   
   This happens because when we do not pass the column name in the google client request then it returns a column value in form of an object Python JSON fails to serialise it but if I'm passing the column name then it is returned as a string 
   
   There is existing work around it by setting `enable_xcom_pickling`  to true in default_airflow.cfg but this is insecure
   
   Changes:
   - Send a list of all columns as selected_fields in case the selected_fields param is None
   
   After this change below task will succeed and store the response in xcom for any bigquery column type without  setting up enable_xcom_pickling to true
   ```
   get_data = BigQueryGetDataOperator(
           task_id='get_data_from_bq',
           dataset_id=DATASET_NAME,
           table_id='dummy_table',
           max_results=100,
       )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #22936: Fix BigQueryGetDataOperator Response Serialisation

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #22936:
URL: https://github.com/apache/airflow/pull/22936


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #22936: Fix BigQueryGetDataOperator Response Serialisation

Posted by GitBox <gi...@apache.org>.
pankajastro commented on code in PR #22936:
URL: https://github.com/apache/airflow/pull/22936#discussion_r848419082


##########
airflow/providers/google/cloud/operators/bigquery.py:
##########
@@ -467,6 +467,14 @@ def execute(self, context: 'Context') -> list:
             impersonation_chain=self.impersonation_chain,
         )
 
+        if not self.selected_fields:
+            schema: Dict[str, list] = hook.get_schema(
+                dataset_id=self.dataset_id,
+                table_id=self.table_id,
+            )
+            if schema.get("fields"):
+                self.selected_fields = ','.join([field["name"] for field in schema["fields"]])

Review Comment:
   Here, `fields` are columns of a bigquery table so if no "fields" means there is no columns in bigquery table in that case we are passing `self.selected_fields` as  None in google client SDK and I'm getting an empty response so that won't be a problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #22936: Fix BigQueryGetDataOperator Response Serialisation

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22936:
URL: https://github.com/apache/airflow/pull/22936#discussion_r848821059


##########
airflow/providers/google/cloud/operators/bigquery.py:
##########
@@ -467,6 +467,14 @@ def execute(self, context: 'Context') -> list:
             impersonation_chain=self.impersonation_chain,
         )
 
+        if not self.selected_fields:
+            schema: Dict[str, list] = hook.get_schema(
+                dataset_id=self.dataset_id,
+                table_id=self.table_id,
+            )
+            if schema.get("fields"):
+                self.selected_fields = ','.join([field["name"] for field in schema["fields"]])

Review Comment:
   This should be `if "fields" in schema` then



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #22936: Fix BigQueryGetDataOperator Response Serialisation

Posted by GitBox <gi...@apache.org>.
pankajastro commented on code in PR #22936:
URL: https://github.com/apache/airflow/pull/22936#discussion_r848420264


##########
airflow/providers/google/cloud/operators/bigquery.py:
##########
@@ -467,6 +467,14 @@ def execute(self, context: 'Context') -> list:
             impersonation_chain=self.impersonation_chain,
         )
 
+        if not self.selected_fields:
+            schema: Dict[str, list] = hook.get_schema(
+                dataset_id=self.dataset_id,
+                table_id=self.table_id,
+            )
+            if schema.get("fields"):
+                self.selected_fields = ','.join([field["name"] for field in schema["fields"]])

Review Comment:
   And, the control will come in if block only if `schema.get("fields")` exist



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #22936: Fix BigQueryGetDataOperator Response Serialisation

Posted by GitBox <gi...@apache.org>.
pankajastro commented on code in PR #22936:
URL: https://github.com/apache/airflow/pull/22936#discussion_r848420264


##########
airflow/providers/google/cloud/operators/bigquery.py:
##########
@@ -467,6 +467,14 @@ def execute(self, context: 'Context') -> list:
             impersonation_chain=self.impersonation_chain,
         )
 
+        if not self.selected_fields:
+            schema: Dict[str, list] = hook.get_schema(
+                dataset_id=self.dataset_id,
+                table_id=self.table_id,
+            )
+            if schema.get("fields"):
+                self.selected_fields = ','.join([field["name"] for field in schema["fields"]])

Review Comment:
   And, the control we come in if block only if `schema.get("fields")` exist



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #22936: Fix BigQueryGetDataOperator Response Serialisation

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22936:
URL: https://github.com/apache/airflow/pull/22936#issuecomment-1097611486

   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #22936: Fix BigQueryGetDataOperator Response Serialisation

Posted by GitBox <gi...@apache.org>.
pankajastro commented on code in PR #22936:
URL: https://github.com/apache/airflow/pull/22936#discussion_r849103863


##########
airflow/providers/google/cloud/operators/bigquery.py:
##########
@@ -467,6 +467,14 @@ def execute(self, context: 'Context') -> list:
             impersonation_chain=self.impersonation_chain,
         )
 
+        if not self.selected_fields:
+            schema: Dict[str, list] = hook.get_schema(
+                dataset_id=self.dataset_id,
+                table_id=self.table_id,
+            )
+            if schema.get("fields"):
+                self.selected_fields = ','.join([field["name"] for field in schema["fields"]])

Review Comment:
   Make sense. Updated PR with the suggested change. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #22936: Fix BigQueryGetDataOperator Response Serialisation

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22936:
URL: https://github.com/apache/airflow/pull/22936#discussion_r848408471


##########
airflow/providers/google/cloud/operators/bigquery.py:
##########
@@ -467,6 +467,14 @@ def execute(self, context: 'Context') -> list:
             impersonation_chain=self.impersonation_chain,
         )
 
+        if not self.selected_fields:
+            schema: Dict[str, list] = hook.get_schema(
+                dataset_id=self.dataset_id,
+                table_id=self.table_id,
+            )
+            if schema.get("fields"):
+                self.selected_fields = ','.join([field["name"] for field in schema["fields"]])

Review Comment:
   When is `"fields"` not present in `schema`, and what would happen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org