You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/01 13:30:07 UTC

[GitHub] [airflow] wojsamjan opened a new pull request, #26102: BigQuery deferrable operators

wojsamjan opened a new pull request, #26102:
URL: https://github.com/apache/airflow/pull/26102

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   Add BigQuery deferrable operators
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #26102: BigQuery deferrable operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #26102:
URL: https://github.com/apache/airflow/pull/26102#discussion_r961160649


##########
airflow/providers/google/common/hooks/base_google.py:
##########
@@ -602,3 +603,26 @@ def test_connection(self):
             message = str(e)
 
         return status, message
+
+
+class GoogleBaseHookAsync(BaseHook):
+    """GoogleBaseHookAsync inherits from BaseHook class, run on the trigger worker"""
+
+    sync_hook_class: Any = None
+
+    def __init__(self, **kwargs: Any):
+        self._hook_kwargs = kwargs
+        self._sync_hook = None
+
+    async def get_sync_hook(self) -> Any:
+        """
+        Sync version of the Google Cloud Hooks makes blocking calls in ``__init__`` so we don't inherit
+        from it.
+        """
+        if not self._sync_hook:
+            self._sync_hook = await sync_to_async(self.sync_hook_class)(**self._hook_kwargs)
+        return self._sync_hook
+
+    async def service_file_as_context(self) -> Any:
+        sync_hook = await self.get_sync_hook()
+        return await sync_to_async(sync_hook.provide_gcp_credential_file_as_context)()

Review Comment:
   Same code as 
   https://github.com/astronomer/astronomer-providers/blob/main/astronomer/providers/google/common/hooks/base_google.py#L7-L27



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on pull request #26102: BigQuery deferrable operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on PR #26102:
URL: https://github.com/apache/airflow/pull/26102#issuecomment-1237991574

   Closing this in favor of https://github.com/apache/airflow/pull/26156 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26102: BigQuery deferrable operators

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26102:
URL: https://github.com/apache/airflow/pull/26102#discussion_r963309290


##########
airflow/providers/google/cloud/hooks/bigquery.py:
##########
@@ -2821,6 +2824,221 @@ def _get_query_result(self) -> Dict:
         return query_results
 
 
+class BigQueryHookAsync(GoogleBaseHookAsync):
+    """BigQueryHookAsync inherits from GoogleBaseHookAsync class. Interacts with Google BigQuery"""
+
+    sync_hook_class = BigQueryHook
+
+    async def get_job_instance(
+        self, project_id: Optional[str], job_id: Optional[str], session: ClientSession
+    ) -> Job:
+        """Get the specified job resource by job ID and project ID."""
+        with await self.service_file_as_context() as f:
+            return Job(job_id=job_id, project=project_id, service_file=f, session=cast(Session, session))
+
+    async def get_job_status(
+        self,
+        job_id: Optional[str],
+        project_id: Optional[str] = None,
+    ) -> Optional[str]:
+        """
+        Polls for job status asynchronously using gcloud-aio.
+
+        Note that an OSError is raised when Job results are still pending.
+        """
+        async with ClientSession() as s:
+            try:
+                self.log.info("Executing get_job_status...")
+                job_client = await self.get_job_instance(project_id, job_id, s)
+                job_status_response = await job_client.result(cast(Session, s))
+                if job_status_response:
+                    job_status = "success"
+            except OSError:
+                job_status = "pending"
+            except Exception as e:
+                self.log.info("Query execution finished with errors...")
+                job_status = str(e)
+            return job_status
+
+    async def get_job_output(
+        self,
+        job_id: Optional[str],
+        project_id: Optional[str] = None,
+    ) -> Dict[str, Any]:
+        """Get the big query job output for the given job id asynchronously using gcloud-aio."""
+        async with ClientSession() as session:
+            self.log.info("Executing get_job_output..")
+            job_client = await self.get_job_instance(project_id, job_id, session)
+            job_query_response = await job_client.get_query_results(cast(Session, session))
+            return job_query_response
+
+    def get_records(self, query_results: Dict[str, Any]) -> List[Any]:
+        """
+        Given the output query response from gcloud aio bigquery, convert the response to records.
+
+        :param query_results: the results from a SQL query
+        """
+    buffer = []
+    for dict_row in query_results.get("rows", {}):
+        buffer.append([vs["v"] for vs in dict_row["f"]])
+    return buffer
+
+    def value_check(
+        self,
+        sql: str,
+        pass_value: Any,
+        records: List[Any],
+        tolerance: Optional[float] = None,
+    ) -> None:
+        """
+        Match a single query resulting row and tolerance with pass_value
+
+        :return: Raises AirflowException if there is no match.

Review Comment:
   ```suggestion
           :raises: AirflowException if there is no match.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil closed pull request #26102: BigQuery deferrable operators

Posted by GitBox <gi...@apache.org>.
kaxil closed pull request #26102: BigQuery deferrable operators
URL: https://github.com/apache/airflow/pull/26102


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26102: BigQuery deferrable operators

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26102:
URL: https://github.com/apache/airflow/pull/26102#discussion_r963308962


##########
airflow/providers/google/cloud/hooks/bigquery.py:
##########
@@ -2821,6 +2824,221 @@ def _get_query_result(self) -> Dict:
         return query_results
 
 
+class BigQueryHookAsync(GoogleBaseHookAsync):
+    """BigQueryHookAsync inherits from GoogleBaseHookAsync class. Interacts with Google BigQuery"""
+
+    sync_hook_class = BigQueryHook
+
+    async def get_job_instance(
+        self, project_id: Optional[str], job_id: Optional[str], session: ClientSession
+    ) -> Job:
+        """Get the specified job resource by job ID and project ID."""
+        with await self.service_file_as_context() as f:
+            return Job(job_id=job_id, project=project_id, service_file=f, session=cast(Session, session))

Review Comment:
   This `cast` doesn’t feel right… do you have a reference for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26102: BigQuery deferrable operators

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26102:
URL: https://github.com/apache/airflow/pull/26102#discussion_r963306494


##########
airflow/providers/google/cloud/hooks/bigquery.py:
##########
@@ -29,8 +29,10 @@
 import warnings
 from copy import deepcopy
 from datetime import datetime, timedelta
-from typing import Any, Dict, Iterable, List, Mapping, NoReturn, Optional, Sequence, Tuple, Type, Union
+from typing import Any, Dict, Iterable, List, Mapping, NoReturn, Optional, Sequence, Tuple, Type, Union, cast
 
+from aiohttp import ClientSession as ClientSession

Review Comment:
   ```suggestion
   from aiohttp import ClientSession
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #26102: BigQuery deferrable operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #26102:
URL: https://github.com/apache/airflow/pull/26102#discussion_r961157303


##########
tests/providers/google/cloud/hooks/test_bigquery.py:
##########
@@ -2053,3 +2055,195 @@ def test_create_external_table_description(self, mock_create):
 
         _, kwargs = mock_create.call_args
         assert kwargs['table_resource']['description'] is description
+
+
+@pytest.mark.asyncio
+@mock.patch("airflow.providers.google.cloud.hooks.bigquery.ClientSession")
+async def test_get_job_instance(mock_session):
+    hook = BigQueryHookAsync()
+    result = await hook.get_job_instance(project_id=PROJECT_ID, job_id=JOB_ID, session=mock_session)
+    assert isinstance(result, Job)

Review Comment:
   https://github.com/astronomer/astronomer-providers/blob/main/tests/google/cloud/hooks/test_bigquery.py#L74-L79
   
   Tests are also copied from astronomer providers.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org