You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/26 10:00:18 UTC

[GitHub] [airflow] bjankie1 opened a new pull request, #25302: Dataproc submit job operator async

bjankie1 opened a new pull request, #25302:
URL: https://github.com/apache/airflow/pull/25302

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   Add `deferrable` capability to existing `DataprocJobBaseOperator` and `DataprocSubmitJobOperator` operators.
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
pankajastro commented on code in PR #25302:
URL: https://github.com/apache/airflow/pull/25302#discussion_r936597733


##########
airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -1771,6 +1808,9 @@ class DataprocSubmitJobOperator(BaseOperator):
     :param asynchronous: Flag to return after submitting the job to the Dataproc API.
         This is useful for submitting long running jobs and
         waiting on them asynchronously using the DataprocJobSensor
+    :param deferrable: Run operator in the deferrable mode
+    :param polling_interval_seconds time in seconds between polling for job completion.

Review Comment:
   ```suggestion
       :param polling_interval_seconds: time in seconds between polling for job completion.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25302:
URL: https://github.com/apache/airflow/pull/25302#issuecomment-1203811571

   And static/docs need to be fixed too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on PR #25302:
URL: https://github.com/apache/airflow/pull/25302#issuecomment-1222814016

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25302:
URL: https://github.com/apache/airflow/pull/25302#issuecomment-1199202739

   Some tewsts failing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25302:
URL: https://github.com/apache/airflow/pull/25302#issuecomment-1206699494

   Test failing :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mik-laj commented on a diff in pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
mik-laj commented on code in PR #25302:
URL: https://github.com/apache/airflow/pull/25302#discussion_r936040468


##########
airflow/providers/google/cloud/hooks/dataproc.py:
##########
@@ -958,3 +973,736 @@ def list_batches(
             metadata=metadata,
         )
         return result
+
+
+class DataprocAsyncHook(GoogleBaseHook):
+    """
+    Asynchronuous Hook for Google Cloud Dataproc APIs.
+
+    All the methods in the hook where project_id is used must be called with
+    keyword arguments rather than positional.
+    """
+
+    def __init__(
+        self,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(gcp_conn_id, delegate_to, impersonation_chain)
+
+    def get_cluster_client(self, region: Optional[str] = None) -> ClusterControllerAsyncClient:
+        """Returns ClusterControllerAsyncClient."""
+        client_options = None
+        if region and region != 'global':
+            client_options = ClientOptions(api_endpoint=f'{region}-dataproc.googleapis.com:443')
+
+        return ClusterControllerAsyncClient(
+            credentials=self._get_credentials(), client_info=CLIENT_INFO, client_options=client_options
+        )
+
+    def get_template_client(self, region: Optional[str] = None) -> WorkflowTemplateServiceAsyncClient:
+        """Returns WorkflowTemplateServiceAsyncClient."""
+        client_options = None
+        if region and region != 'global':
+            client_options = ClientOptions(api_endpoint=f'{region}-dataproc.googleapis.com:443')
+
+        return WorkflowTemplateServiceAsyncClient(
+            credentials=self._get_credentials(), client_info=CLIENT_INFO, client_options=client_options
+        )
+
+    def get_job_client(self, region: Optional[str] = None) -> JobControllerAsyncClient:
+        """Returns JobControllerAsyncClient."""
+        client_options = None
+        if region and region != 'global':
+            client_options = ClientOptions(api_endpoint=f'{region}-dataproc.googleapis.com:443')
+
+        return JobControllerAsyncClient(
+            credentials=self._get_credentials(),
+            client_info=CLIENT_INFO,
+            client_options=client_options,
+        )
+
+    def get_batch_client(self, region: Optional[str] = None) -> BatchControllerAsyncClient:
+        """Returns BatchControllerAsyncClient"""
+        client_options = None
+        if region and region != 'global':
+            client_options = ClientOptions(api_endpoint=f'{region}-dataproc.googleapis.com:443')
+
+        return BatchControllerAsyncClient(
+            credentials=self._get_credentials(), client_info=CLIENT_INFO, client_options=client_options
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    async def create_cluster(
+        self,
+        region: str,
+        project_id: str,

Review Comment:
   This parameter should be set to None by default. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bharanidharan14 commented on a diff in pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
bharanidharan14 commented on code in PR #25302:
URL: https://github.com/apache/airflow/pull/25302#discussion_r934452024


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -0,0 +1,69 @@
+import asyncio
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.dataproc import DataprocAsyncHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+from google.cloud.dataproc_v1 import JobStatus, Job
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
+
+
+class DataprocBaseTrigger(BaseTrigger):
+    """
+    Trigger that periodically pollls information from Dataproc API to verify job status.
+    Implementation leverages asynchronous transport.
+    """
+
+    def __init__(
+        self,
+        job_id: str,
+        project_id: str,
+        region: str,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        delegate_to: Optional[str] = None,
+        pooling_period_seconds: int = 30,
+    ):
+        super().__init__()
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.job_id = job_id
+        self.project_id = project_id
+        self.region = region
+        self.pooling_period_seconds = pooling_period_seconds
+        self.delegate_to = delegate_to
+        self.hook = DataprocAsyncHook(
+            delegate_to=self.delegate_to,
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    def serialize(self):
+        return (
+            "airflow.providers.google.cloud.operators.dataproc.DataprocBaseTrigger",
+            {
+                "job_id": self.job_id,
+                "project_id": self.project_id,
+                "region": self.region,
+                "gcp_conn_id": self.gcp_conn_id,
+                "delegate_to": self.delegate_to,
+                "impersonation_chain": self.impersonation_chain,
+                "pooling_period_seconds": self.pooling_period_seconds,
+            },
+        )
+
+    async def run(self):
+        while True:
+            job = await self.hook.get_job(project_id=self.project_id, region=self.region, job_id=self.job_id)
+            state = job.status.state
+            self.log.info("Dataproc job: %s is in state: %s", self.job_id, state)
+            if state in (JobStatus.State.ERROR, JobStatus.State.DONE, JobStatus.State.CANCELLED):
+                if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED):
+                    break
+                elif state == JobStatus.State.ERROR:
+                    raise AirflowException(f"Dataproc job execution failed {self.job_id}")
+                else:

Review Comment:
   I guess you don't need this `else` block because in line 59 you are checking the state only for 'Error', 'Done', 'Cancelled' you won't be getting chance for other job status state



##########
docs/apache-airflow-providers-google/operators/cloud/dataproc.rst:
##########
@@ -174,6 +174,14 @@ Example of the configuration for a Spark Job:
     :start-after: [START how_to_cloud_dataproc_spark_config]
     :end-before: [END how_to_cloud_dataproc_spark_config]
 
+Example of the configuration for a Spark Job running in `deferrable mode <https://airflow.apache.org/docs/apache-airflow/stable/concepts/deferring.html>`__:
+
+.. exampleinclude:: /../../tests/system/providers/google/dataproc/example_dataproc_spark_deferrable.py
+    :language: python
+    :dedent: 0

Review Comment:
   why `:dedent: 0` ? it should be `:dedent: 4` ?



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -0,0 +1,69 @@
+import asyncio
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.dataproc import DataprocAsyncHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+from google.cloud.dataproc_v1 import JobStatus, Job
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
+
+
+class DataprocBaseTrigger(BaseTrigger):
+    """
+    Trigger that periodically pollls information from Dataproc API to verify job status.
+    Implementation leverages asynchronous transport.
+    """
+
+    def __init__(
+        self,
+        job_id: str,
+        project_id: str,
+        region: str,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        delegate_to: Optional[str] = None,
+        pooling_period_seconds: int = 30,
+    ):
+        super().__init__()
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.job_id = job_id
+        self.project_id = project_id
+        self.region = region
+        self.pooling_period_seconds = pooling_period_seconds
+        self.delegate_to = delegate_to
+        self.hook = DataprocAsyncHook(
+            delegate_to=self.delegate_to,
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    def serialize(self):
+        return (
+            "airflow.providers.google.cloud.operators.dataproc.DataprocBaseTrigger",

Review Comment:
   This class path is wrong it should be 
   `airflow.providers.google.cloud.triggers.dataproc.DataprocBaseTrigger`. 



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -0,0 +1,69 @@
+import asyncio
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.dataproc import DataprocAsyncHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+from google.cloud.dataproc_v1 import JobStatus, Job
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
+
+
+class DataprocBaseTrigger(BaseTrigger):
+    """
+    Trigger that periodically pollls information from Dataproc API to verify job status.

Review Comment:
   Spell check `pollls`. Also in the params it is mentioned as `pooling_period_seconds`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] rajaths010494 commented on a diff in pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
rajaths010494 commented on code in PR #25302:
URL: https://github.com/apache/airflow/pull/25302#discussion_r934429065


##########
airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -958,6 +962,19 @@ def execute(self, context: 'Context'):
                 context=context, task_instance=self, url=DATAPROC_JOB_LOG_LINK, resource=job_id
             )
 
+            if self.deferrable:
+                self.defer(
+                    trigger=DataprocBaseTrigger(
+                        job_id=self.job_id,
+                        project_id=self.project_id,
+                        region=self.region,
+                        delegate_to=self.delegate_to,
+                        gcp_conn_id=self.gcp_conn_id,
+                        impersonation_chain=self.impersonation_chain,
+                        pooling_period_seconds=10,

Review Comment:
   can we not hardcode this value and use it as a param taken from the user as `polling_interval`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
pankajastro commented on PR #25302:
URL: https://github.com/apache/airflow/pull/25302#issuecomment-1203151897

   @potiuk Previous we have created two different operators/sensors one for synchronous and one asynchronous for example DatabricksSubmitRunDeferrableOperator is for async and DatabricksSubmitRunOperator for sync but in this PR we have added a flag for async in the existing operator. does it would be considered as an inconsistency and do we need to worry about it.? https://github.com/apache/airflow/blob/main/airflow/providers/databricks/operators/databricks.py#L368 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] rajaths010494 commented on pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
rajaths010494 commented on PR #25302:
URL: https://github.com/apache/airflow/pull/25302#issuecomment-1201092932

   Test cases for triggerer are missing.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
pankajastro commented on code in PR #25302:
URL: https://github.com/apache/airflow/pull/25302#discussion_r936596677


##########
airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -867,6 +868,9 @@ class DataprocJobBaseOperator(BaseOperator):
     :param asynchronous: Flag to return after submitting the job to the Dataproc API.
         This is useful for submitting long running jobs and
         waiting on them asynchronously using the DataprocJobSensor
+    :param deferrable: Run operator in the deferrable mode
+    :param polling_interval_seconds time in seconds between polling for job completion.

Review Comment:
   ```suggestion
       :param polling_interval_seconds: time in seconds between polling for job completion.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on PR #25302:
URL: https://github.com/apache/airflow/pull/25302#issuecomment-1195274969

   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25302:
URL: https://github.com/apache/airflow/pull/25302#issuecomment-1207755957

   One doc failure left.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25302:
URL: https://github.com/apache/airflow/pull/25302#issuecomment-1203162397

   I am perfectly ok with per-provider consistency, rather than "per-airflow" consistency. There is no reason why we should "force" one way or the other cross-providers. And there might be reaons why it's easier for one provider to do it this way and for another provider - different way.
   
   Traditionally each provider should follow their own "standards" and be consistent - for all things except the Airflow "performace" and best practices. We are gearing up (in our tooling for now but soon in the code) to splitting providers to individual repos and then it will be even less important is such level of consistency is required.
   
   I personally think of this in the very way Apache Software Foundation way does with their projects. There is a very, very small  but super-strict interface of the "distributed" component (project in ASF and provider in Airlfow) should follow and it should be strict and followed - but all the rest should be left to decide internally (by project in ASF and by provider in Airlfow). 
   
   The things that we care about at the airlfow level should very well described in https://github.com/apache/airflow/blob/main/README.md and strictly regulated by the processes/automation. All the rest - people who are most active in the providers should decide.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25302:
URL: https://github.com/apache/airflow/pull/25302#issuecomment-1203111255

   Some errors to fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bjankie1 commented on pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
bjankie1 commented on PR #25302:
URL: https://github.com/apache/airflow/pull/25302#issuecomment-1199034585

   > Looks pretty coll - but we also need some examples and likely entries in the documentation describing the usage and mentioning deferrable options. Otherwise it will not be discoverable enough.
   
   Thank you for pointing it out. I've fixed with #1bf260a


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #25302: Dataproc submit job operator async

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #25302:
URL: https://github.com/apache/airflow/pull/25302


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org