You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/06/15 08:33:26 UTC

[GitHub] [airflow] MazrimT opened a new issue, #24460: let BigQueryGetData operator take a query string and as_dict flag

MazrimT opened a new issue, #24460:
URL: https://github.com/apache/airflow/issues/24460

   ### Description
   
   Today the BigQueryGetData airflow.providers.google.cloud.operators.bigquery.BigQueryGetDataOperator only allows you to point to a specific dataset and table and how many rows you want.
   
   It already sets up a BigQueryHook so it very easy to implement custom query from a string as well.
   It would also be very efficient to have a as_dict flag to return the result as a list of dicts.
   I am not an expert in python byt here is my atempt at a modification of the current code (from 8.0.0rc2)
   
   ``` python
   class BigQueryGetDataOperator(BaseOperator):
       """
       Fetches the data from a BigQuery table (alternatively fetch data for selected columns)
       and returns data in a python list. The number of elements in the returned list will
       be equal to the number of rows fetched. Each element in the list will again be a list
       where element would represent the columns values for that row.
   
       **Example Result**: ``[['Tony', '10'], ['Mike', '20'], ['Steve', '15']]``
   
       .. seealso::
           For more information on how to use this operator, take a look at the guide:
           :ref:`howto/operator:BigQueryGetDataOperator`
   
       .. note::
           If you pass fields to ``selected_fields`` which are in different order than the
           order of columns already in
           BQ table, the data will still be in the order of BQ table.
           For example if the BQ table has 3 columns as
           ``[A,B,C]`` and you pass 'B,A' in the ``selected_fields``
           the data would still be of the form ``'A,B'``.
   
       **Example**: ::
   
           get_data = BigQueryGetDataOperator(
               task_id='get_data_from_bq',
               dataset_id='test_dataset',
               table_id='Transaction_partitions',
               max_results=100,
               selected_fields='DATE',
               gcp_conn_id='airflow-conn-id'
           )
   
       :param dataset_id: The dataset ID of the requested table. (templated)
       :param table_id: The table ID of the requested table. (templated)
       :param max_results: The maximum number of records (rows) to be fetched
           from the table. (templated)
       :param selected_fields: List of fields to return (comma-separated). If
           unspecified, all fields are returned.
       :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
       :param delegate_to: The account to impersonate using domain-wide delegation of authority,
           if any. For this to work, the service account making the request must have
           domain-wide delegation enabled.
       :param location: The location used for the operation.
       :param impersonation_chain: Optional service account to impersonate using short-term
           credentials, or chained list of accounts required to get the access_token
           of the last account in the list, which will be impersonated in the request.
           If set as a string, the account must grant the originating account
           the Service Account Token Creator IAM role.
           If set as a sequence, the identities from the list must grant
           Service Account Token Creator IAM role to the directly preceding identity, with first
           account from the list granting this role to the originating account (templated).
       :param query: (Optional) a sql query to execute instead of getting data from specific dataset and table
       :param as_dict: if the result should be returned as a list of dictionaries. default to False
       """
   
       template_fields: Sequence[str] = (
           'dataset_id',
           'table_id',
           'max_results',
           'selected_fields',
           'impersonation_chain',
       )
       ui_color = BigQueryUIColors.QUERY.value
   
       def __init__(
           self,
           *,
           dataset_id: str,
           table_id: str,
           max_results: int = 100,
           selected_fields: Optional[str] = None,
           gcp_conn_id: str = 'google_cloud_default',
           delegate_to: Optional[str] = None,
           location: Optional[str] = None,
           impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
           query: Optional[str] = None,
           as_dict: bool = False,
           **kwargs,
       ) -> None:
           super().__init__(**kwargs)
   
           self.dataset_id = dataset_id
           self.table_id = table_id
           self.max_results = int(max_results)
           self.selected_fields = selected_fields
           self.gcp_conn_id = gcp_conn_id
           self.delegate_to = delegate_to
           self.location = location
           self.impersonation_chain = impersonation_chain
           self.query = query
           self.as_dict = as_dict
   
       def execute(self, context: 'Context') -> list:
           self.log.info(
               'Fetching Data from %s.%s max results: %s', self.dataset_id, self.table_id, self.max_results
           )
   
           hook = BigQueryHook(
               gcp_conn_id=self.gcp_conn_id,
               delegate_to=self.delegate_to,
               impersonation_chain=self.impersonation_chain,
               location=self.location,
           )
   
           if not self.query:
               if not self.selected_fields:
                   schema: Dict[str, list] = hook.get_schema(
                       dataset_id=self.dataset_id,
                       table_id=self.table_id,
                   )
                   if "fields" in schema:
                       self.selected_fields = ','.join([field["name"] for field in schema["fields"]])
   
               rows = hook.list_rows(
                   dataset_id=self.dataset_id,
                   table_id=self.table_id,
                   max_results=self.max_results,
                   selected_fields=self.selected_fields
               )
               if self.as_dict:
                   table_data = [json.dumps(dict(zip(self.selected_fields, row))).encode('utf-8') for row in rows]
               else:
                   table_data = [row.values() for row in rows]    
           
           else:
               with hook.get_conn().cursor().execute(self.query) as cursor:
                   if self.as_dict:
                       table_data = [json.dumps(dict(zip(self.keys,row))).encode('utf-8') for row in cursor.fetchall()]
                   else:
                       table_data = [row for row in cursor.fetchall()]
   
           self.log.info('Total extracted rows: %s', len(table_data))
   
           return table_data
   ```
   
   ### Use case/motivation
   
   This would simplify getting data from BigQuery into airflow instead of having to first store the data in a separat table with BigQueryInsertJob and then fetch that.
   Also simplifies handling the data with as_dict in the same way that many other database connectors in python does.
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] shahar1 commented on issue #24460: let BigQueryGetData operator take a query string and as_dict flag

Posted by "shahar1 (via GitHub)" <gi...@apache.org>.
shahar1 commented on issue #24460:
URL: https://github.com/apache/airflow/issues/24460#issuecomment-1524867578

   I created a PR for adding `as_dict` to `BigQueryGetData` operator.
   Regarding the usage of a query string, I believe it may not be appropriate for this operator. This is because it functions as a higher-level operator that only requires parts of the SQL SELECT query, as opposed to `BigQueryInsertJobOperator` which requires the explicit SQL query.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MazrimT commented on issue #24460: let BigQueryGetData operator take a query string and as_dict flag

Posted by GitBox <gi...@apache.org>.
MazrimT commented on issue #24460:
URL: https://github.com/apache/airflow/issues/24460#issuecomment-1156379225

   I have created a PR: https://github.com/apache/airflow/pull/24468


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal closed issue #24460: let BigQueryGetData operator take a query string and as_dict flag

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal closed issue #24460: let BigQueryGetData operator take a query string and as_dict flag
URL: https://github.com/apache/airflow/issues/24460


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #24460: let BigQueryGetData operator take a query string and as_dict flag

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #24460:
URL: https://github.com/apache/airflow/issues/24460#issuecomment-1156165727

   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on issue #24460: let BigQueryGetData operator take a query string and as_dict flag

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on issue #24460:
URL: https://github.com/apache/airflow/issues/24460#issuecomment-1434928470

   assigned


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #24460: let BigQueryGetData operator take a query string and as_dict flag

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #24460:
URL: https://github.com/apache/airflow/issues/24460#issuecomment-1156318944

   Assigned you - please open PR - it will let you show what you really want to do - it's way better because comments can be done against actual code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] shahar1 commented on issue #24460: let BigQueryGetData operator take a query string and as_dict flag

Posted by "shahar1 (via GitHub)" <gi...@apache.org>.
shahar1 commented on issue #24460:
URL: https://github.com/apache/airflow/issues/24460#issuecomment-1426774718

   @eladkal Please assign me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org