You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/01 12:07:48 UTC

[GitHub] [airflow] bharanidharan14 commented on a diff in pull request #25302: Dataproc submit job operator async

bharanidharan14 commented on code in PR #25302:
URL: https://github.com/apache/airflow/pull/25302#discussion_r934452024


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -0,0 +1,69 @@
+import asyncio
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.dataproc import DataprocAsyncHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+from google.cloud.dataproc_v1 import JobStatus, Job
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
+
+
+class DataprocBaseTrigger(BaseTrigger):
+    """
+    Trigger that periodically pollls information from Dataproc API to verify job status.
+    Implementation leverages asynchronous transport.
+    """
+
+    def __init__(
+        self,
+        job_id: str,
+        project_id: str,
+        region: str,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        delegate_to: Optional[str] = None,
+        pooling_period_seconds: int = 30,
+    ):
+        super().__init__()
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.job_id = job_id
+        self.project_id = project_id
+        self.region = region
+        self.pooling_period_seconds = pooling_period_seconds
+        self.delegate_to = delegate_to
+        self.hook = DataprocAsyncHook(
+            delegate_to=self.delegate_to,
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    def serialize(self):
+        return (
+            "airflow.providers.google.cloud.operators.dataproc.DataprocBaseTrigger",
+            {
+                "job_id": self.job_id,
+                "project_id": self.project_id,
+                "region": self.region,
+                "gcp_conn_id": self.gcp_conn_id,
+                "delegate_to": self.delegate_to,
+                "impersonation_chain": self.impersonation_chain,
+                "pooling_period_seconds": self.pooling_period_seconds,
+            },
+        )
+
+    async def run(self):
+        while True:
+            job = await self.hook.get_job(project_id=self.project_id, region=self.region, job_id=self.job_id)
+            state = job.status.state
+            self.log.info("Dataproc job: %s is in state: %s", self.job_id, state)
+            if state in (JobStatus.State.ERROR, JobStatus.State.DONE, JobStatus.State.CANCELLED):
+                if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED):
+                    break
+                elif state == JobStatus.State.ERROR:
+                    raise AirflowException(f"Dataproc job execution failed {self.job_id}")
+                else:

Review Comment:
   I guess you don't need this `else` block because in line 59 you are checking the state only for 'Error', 'Done', 'Cancelled' you won't be getting chance for other job status state



##########
docs/apache-airflow-providers-google/operators/cloud/dataproc.rst:
##########
@@ -174,6 +174,14 @@ Example of the configuration for a Spark Job:
     :start-after: [START how_to_cloud_dataproc_spark_config]
     :end-before: [END how_to_cloud_dataproc_spark_config]
 
+Example of the configuration for a Spark Job running in `deferrable mode <https://airflow.apache.org/docs/apache-airflow/stable/concepts/deferring.html>`__:
+
+.. exampleinclude:: /../../tests/system/providers/google/dataproc/example_dataproc_spark_deferrable.py
+    :language: python
+    :dedent: 0

Review Comment:
   why `:dedent: 0` ? it should be `:dedent: 4` ?



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -0,0 +1,69 @@
+import asyncio
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.dataproc import DataprocAsyncHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+from google.cloud.dataproc_v1 import JobStatus, Job
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
+
+
+class DataprocBaseTrigger(BaseTrigger):
+    """
+    Trigger that periodically pollls information from Dataproc API to verify job status.
+    Implementation leverages asynchronous transport.
+    """
+
+    def __init__(
+        self,
+        job_id: str,
+        project_id: str,
+        region: str,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        delegate_to: Optional[str] = None,
+        pooling_period_seconds: int = 30,
+    ):
+        super().__init__()
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.job_id = job_id
+        self.project_id = project_id
+        self.region = region
+        self.pooling_period_seconds = pooling_period_seconds
+        self.delegate_to = delegate_to
+        self.hook = DataprocAsyncHook(
+            delegate_to=self.delegate_to,
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    def serialize(self):
+        return (
+            "airflow.providers.google.cloud.operators.dataproc.DataprocBaseTrigger",

Review Comment:
   This class path is wrong it should be 
   `airflow.providers.google.cloud.triggers.dataproc.DataprocBaseTrigger`. 



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -0,0 +1,69 @@
+import asyncio
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.dataproc import DataprocAsyncHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+from google.cloud.dataproc_v1 import JobStatus, Job
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
+
+
+class DataprocBaseTrigger(BaseTrigger):
+    """
+    Trigger that periodically pollls information from Dataproc API to verify job status.

Review Comment:
   Spell check `pollls`. Also in the params it is mentioned as `pooling_period_seconds`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org