You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/21 18:39:07 UTC

[GitHub] [airflow] TobKed opened a new pull request #11726: Add DataflowJobStatusSensor

TobKed opened a new pull request #11726:
URL: https://github.com/apache/airflow/pull/11726


   Added `DataflowJobStatusSensor` to monitor job status of given Dataflow job
   Added parameter `wait_until_finished` to Dataflow operators to allow asynchronous execution of batch pipelines.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r517609877



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -324,6 +344,23 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
             `https://cloud.google.com/dataflow/pipelines/specifying-exec-params
             <https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment>`__
     :type environment: Optional[dict]
+    :param wait_until_finished: (Optional)
+        If True, wait for the end of pipeline execution before exiting. If False,
+        it only waits for it to starts (``JOB_STATE_RUNNING``).
+
+        The default behavior depends on the type of pipeline:
+
+        * for the streaming pipeline, wait for jobs to start,
+        * for the batch pipeline, wait for the jobs to complete.
+
+        .. warning::
+
+            You cannot call ``PipelineResult.wait_until_finish`` method in your pipeline code for the operator

Review comment:
       The process of starting the Datafłow job in Airflow consists of two:
   - running a subprocess and reading the stderr/stderr log for the job id.
   - loop waiting for the end of the job ID from the previous step. This loop checks the status of the job.
   
   Step two is started just after step one has finished, so if you have `wait_until_finished` in your pipeline code, step two will not start until the process stops. When this process stops, steps two will run, but it will only execute one iteration as the job will be in a terminal state.
   
   If you in your pipeline do not call the `wait_for_pipeline` method but pass `wait_until_finish =True` to the operator,  the second loop will wait for the job's terminal state.
   
   If you in your pipeline do not call the `wait_for_pipeline` method, and you pass `wait_until_finish =False` to the operator, the second loop will wait for the running state only. 
   
   
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r522970986



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -147,9 +147,16 @@ class _DataflowJobsController(LoggingMixin):
         not by specific job ID, then actions will be performed on all matching jobs.
     :param drain_pipeline: Optional, set to True if want to stop streaming job by draining it
         instead of canceling.
+    :param wait_until_finished: If True, wait for the end of pipeline execution before exiting. If False,
+        it only waits for it to starts (``JOB_STATE_RUNNING``).
+
+        The default behavior depends on the type of pipeline:

Review comment:
       @aaltay I changed behavior for `wait_until_finished=False`.
   Now it will not wait for job to be in running state. It could be checked by the sensor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#issuecomment-721653779


   [The Workflow run](https://github.com/apache/airflow/actions/runs/345352884) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r517382066



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -266,16 +283,18 @@ def _check_dataflow_job_state(self, job) -> bool:
         :rtype: bool
         :raise: Exception
         """
+        if self._wait_until_finished is None:
+            wait_until_finished = DataflowJobType.JOB_TYPE_STREAMING != job['type']

Review comment:
       Thanks. FIxed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] aaltay commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r521613015



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -278,18 +296,20 @@ def _check_dataflow_job_state(self, job) -> bool:
         :rtype: bool
         :raise: Exception
         """
-        if DataflowJobStatus.JOB_STATE_DONE == job["currentState"]:
+        if self._wait_until_finished is None:
+            wait_until_finished = job['type'] != DataflowJobType.JOB_TYPE_STREAMING
+        else:
+            wait_until_finished = self._wait_until_finished
+
+        if job['currentState'] == DataflowJobStatus.JOB_STATE_DONE:

Review comment:
       DRAINED, UPDATED also needs to be in this list. They are alos terminal states.
   
   (https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#jobstate)

##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Cloud Dataflow sensor."""
+from typing import Optional, Sequence, Set, Union
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.cloud.hooks.dataflow import (
+    DEFAULT_DATAFLOW_LOCATION,
+    DataflowHook,
+    DataflowJobStatus,
+)
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataflowJobStatusSensor(BaseSensorOperator):
+    """
+    Checks for the status of a job in Google Cloud Dataflow.
+
+    :param job_id: ID of the job to be checked.
+    :type job_id: str
+    :param expected_statuses: The expected state of the operation.
+        See:
+        https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
+    :type expected_statuses: Union[Set[str], str]
+    :param project_id: Optional, the Google Cloud project ID in which to start a job.
+        If set to None or missing, the default project_id from the Google Cloud connection is used.
+    :type project_id: str
+    :param location: The location of the Dataflow job (for example europe-west1). See:
+        https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
+    :type location: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled. See:
+        https://developers.google.com/identity/protocols/oauth2/service-account#delegatingauthority
+    :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ['job_id']
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        job_id: str,
+        expected_statuses: Union[Set[str], str],
+        project_id: Optional[str] = None,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_id = job_id
+        self.expected_statuses = (
+            {expected_statuses} if isinstance(expected_statuses, str) else expected_statuses
+        )
+        self.project_id = project_id
+        self.location = location
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        self.hook: Optional[DataflowHook] = None
+
+    def poke(self, context: dict) -> bool:

Review comment:
       How often this will be executed? It would be good to wait some reasonable amount of time between subsequent checks.

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -147,9 +147,16 @@ class _DataflowJobsController(LoggingMixin):
         not by specific job ID, then actions will be performed on all matching jobs.
     :param drain_pipeline: Optional, set to True if want to stop streaming job by draining it
         instead of canceling.
+    :param wait_until_finished: If True, wait for the end of pipeline execution before exiting. If False,
+        it only waits for it to starts (``JOB_STATE_RUNNING``).
+
+        The default behavior depends on the type of pipeline:

Review comment:
       If False,
           it only waits for it to starts (``JOB_STATE_RUNNING``) ->
   
   This might wait a long time (hours+). A job might be created and scheduled to run later (e.g. flex resource scheduling).
   
   Suggestion: if False, submit the job and return without waiting




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r510455340



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -132,6 +132,15 @@ class DataflowCreateJavaJobOperator(BaseOperator):
     :type check_if_running: CheckJobRunning(IgnoreJob = do not check if running, FinishIfRunning=
         if job is running finish with nothing, WaitForRun= wait until job finished and the run job)
         ``jar``, ``options``, and ``job_name`` are templated so you can use variables in them.
+    :param wait_until_finished: (Optional)

Review comment:
       You made the description a bit complicated.
   
   ```
   If True, wait for the end of pipeline execution before exiting. If False, it only waits for it to starts (``JOB_STATE_RUNNING``).
   
   The default behavior  depends on the type of pipeline:
   * for the streaming pipeline, wait for jobs to start,
   * for the batch pipeline, wait for the jobs to complete.
   
   .. warning::
   
       You cannot call ``PipelineResult.wait_until_finish`` method in your pipeline code for the operator to work properly. i. e. you must use asynchronous execution. Otherwise, your pipeline will always wait until finished. For more information, look at: `Asynchronous execution <https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#python_10>`__
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #11726: Add DataflowJobStatusSensor

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#issuecomment-713840373


   [The Workflow run](https://github.com/apache/airflow/actions/runs/320623917) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks$,^Build docs$,^Spell check docs$,^Backport packages$,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r517381169



##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""

Review comment:
       Fixed.Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r510374012



##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""
+from typing import Optional, Sequence, Union, Set
+
+from airflow.exceptions import AirflowException
+
+from airflow.providers.google.cloud.hooks.dataflow import (
+    DataflowHook,
+    DEFAULT_DATAFLOW_LOCATION,
+    DataflowJobStatus,
+)
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataflowJobStatusSensor(BaseSensorOperator):
+    """
+    Checks for the status of a job in Google Dataflow.
+
+    :param job_id: ID of the job to be checked.
+    :type job_id: str
+    :param expected_statuses: The expected state of the operation.
+        See:
+        https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
+    :type expected_statuses: Union[Set[str], str]
+    :param project_id: Optional, the Google Cloud project ID in which to start a job.
+        If set to None or missing, the default project_id from the Google Cloud connection is used.
+    :type project_id: str
+    :param location: Job location.
+    :type location: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ['job_id']
+
+    @apply_defaults
+    def __init__(

Review comment:
       Should be in order now :)

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -905,3 +910,30 @@ def cancel_job(
             poll_sleep=self.poll_sleep,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_job(
+        self,
+        job_id: str,
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+    ) -> dict:
+        """
+        Gets the job with the specified Job ID.
+
+        :param job_id: Job ID to get.
+        :type job_id: str
+        :param project_id: Optional, the Google Cloud project ID in which to start a job.
+            If set to None or missing, the default project_id from the Google Cloud connection is used.
+        :type project_id:
+        :param location: Job location.
+        :type location: str
+        :return: the Job
+        :rtype: dict
+        """
+        jobs_controller = _DataflowJobsController(
+            dataflow=self.get_conn(),
+            project_number=project_id,
+            location=location,
+        )
+        return jobs_controller._fetch_job_by_id(job_id)  # pylint: disable=protected-access

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r517381323



##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""
+from typing import Optional, Sequence, Union, Set
+
+from airflow.exceptions import AirflowException
+
+from airflow.providers.google.cloud.hooks.dataflow import (
+    DataflowHook,
+    DEFAULT_DATAFLOW_LOCATION,
+    DataflowJobStatus,
+)
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataflowJobStatusSensor(BaseSensorOperator):
+    """
+    Checks for the status of a job in Google Dataflow.
+
+    :param job_id: ID of the job to be checked.
+    :type job_id: str
+    :param expected_statuses: The expected state of the operation.
+        See:
+        https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
+    :type expected_statuses: Union[Set[str], str]
+    :param project_id: Optional, the Google Cloud project ID in which to start a job.
+        If set to None or missing, the default project_id from the Google Cloud connection is used.
+    :type project_id: str
+    :param location: Job location.
+    :type location: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,

Review comment:
       Added. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r516797979



##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow.py
##########
@@ -127,6 +130,40 @@
         py_interpreter='python3',
         py_system_site_packages=False,
     )
+    # [END howto_operator_start_python_job]
+
+
+with models.DAG(
+    "example_gcp_dataflow_native_python_async",
+    default_args=default_args,
+    start_date=days_ago(1),
+    schedule_interval=None,  # Override to match your needs

Review comment:
       IMHO link to the documentation is not necessary here because `schedule_interval` is documented in `DAG` class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ibzib commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r516185504



##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow.py
##########
@@ -127,6 +130,40 @@
         py_interpreter='python3',
         py_system_site_packages=False,
     )
+    # [END howto_operator_start_python_job]
+
+
+with models.DAG(
+    "example_gcp_dataflow_native_python_async",
+    default_args=default_args,
+    start_date=days_ago(1),
+    schedule_interval=None,  # Override to match your needs

Review comment:
       Should we link to some documentation for `schedule_interval`?

##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow.py
##########
@@ -127,6 +130,40 @@
         py_interpreter='python3',
         py_system_site_packages=False,
     )
+    # [END howto_operator_start_python_job]
+
+
+with models.DAG(
+    "example_gcp_dataflow_native_python_async",
+    default_args=default_args,
+    start_date=days_ago(1),
+    schedule_interval=None,  # Override to match your needs
+    tags=['example'],
+) as dag_native_python_async:
+    start_python_job_async = DataflowCreatePythonJobOperator(
+        task_id="start-python-job-async",
+        py_file=GCS_PYTHON,
+        py_options=[],
+        job_name='{{task.task_id}}',
+        options={
+            'output': GCS_OUTPUT,
+        },
+        py_requirements=['apache-beam[gcp]==2.21.0'],

Review comment:
       Why Beam 2.21.0?

##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""
+from typing import Optional, Sequence, Union, Set
+
+from airflow.exceptions import AirflowException
+
+from airflow.providers.google.cloud.hooks.dataflow import (
+    DataflowHook,
+    DEFAULT_DATAFLOW_LOCATION,
+    DataflowJobStatus,
+)
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataflowJobStatusSensor(BaseSensorOperator):
+    """
+    Checks for the status of a job in Google Dataflow.
+
+    :param job_id: ID of the job to be checked.
+    :type job_id: str
+    :param expected_statuses: The expected state of the operation.
+        See:
+        https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
+    :type expected_statuses: Union[Set[str], str]
+    :param project_id: Optional, the Google Cloud project ID in which to start a job.
+        If set to None or missing, the default project_id from the Google Cloud connection is used.
+    :type project_id: str
+    :param location: Job location.
+    :type location: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,

Review comment:
       Links to GCP documentation for these fields would also be useful.

##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""
+from typing import Optional, Sequence, Union, Set
+
+from airflow.exceptions import AirflowException
+
+from airflow.providers.google.cloud.hooks.dataflow import (
+    DataflowHook,
+    DEFAULT_DATAFLOW_LOCATION,
+    DataflowJobStatus,
+)
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataflowJobStatusSensor(BaseSensorOperator):
+    """
+    Checks for the status of a job in Google Dataflow.
+
+    :param job_id: ID of the job to be checked.
+    :type job_id: str
+    :param expected_statuses: The expected state of the operation.
+        See:
+        https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
+    :type expected_statuses: Union[Set[str], str]
+    :param project_id: Optional, the Google Cloud project ID in which to start a job.
+        If set to None or missing, the default project_id from the Google Cloud connection is used.
+    :type project_id: str
+    :param location: Job location.

Review comment:
       ditto

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -147,9 +147,16 @@ class _DataflowJobsController(LoggingMixin):
         not by specific job ID, then actions will be performed on all matching jobs.
     :param drain_pipeline: Optional, set to True if want to stop streaming job by draining it
         instead of canceling.
+    :param wait_until_finished: If True, wait for the end of pipeline execution before exiting. If False,
+        it only waits for it to starts (``JOB_STATE_RUNNING``).
+
+        The default behavior depends on the type of pipeline:

Review comment:
       This seems reasonable.

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -266,16 +283,18 @@ def _check_dataflow_job_state(self, job) -> bool:
         :rtype: bool
         :raise: Exception
         """
+        if self._wait_until_finished is None:
+            wait_until_finished = DataflowJobType.JOB_TYPE_STREAMING != job['type']

Review comment:
       Nit: avoid [Yoda conditions](https://en.wikipedia.org/wiki/Yoda_conditions).

##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -324,6 +344,23 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
             `https://cloud.google.com/dataflow/pipelines/specifying-exec-params
             <https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment>`__
     :type environment: Optional[dict]
+    :param wait_until_finished: (Optional)
+        If True, wait for the end of pipeline execution before exiting. If False,
+        it only waits for it to starts (``JOB_STATE_RUNNING``).
+
+        The default behavior depends on the type of pipeline:
+
+        * for the streaming pipeline, wait for jobs to start,
+        * for the batch pipeline, wait for the jobs to complete.
+
+        .. warning::
+
+            You cannot call ``PipelineResult.wait_until_finish`` method in your pipeline code for the operator
+            to work properly. i. e. you must use asynchronous execution. Otherwise, your pipeline will
+            always wait until finished. For more information, look at:

Review comment:
       By "always wait until finished," do you mean it will block forever?
   
   Why does this happen?

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -920,3 +942,30 @@ def cancel_job(
             drain_pipeline=self.drain_pipeline,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_job(
+        self,
+        job_id: str,
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+    ) -> dict:
+        """
+        Gets the job with the specified Job ID.
+
+        :param job_id: Job ID to get.
+        :type job_id: str
+        :param project_id: Optional, the Google Cloud project ID in which to start a job.
+            If set to None or missing, the default project_id from the Google Cloud connection is used.
+        :type project_id:
+        :param location: Job location.

Review comment:
       Clarify what "location" means (it's the regional endpoint) and provide an example value (`us-central1`). Add a link to https://cloud.google.com/dataflow/docs/concepts/regional-endpoints.

##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""
+from typing import Optional, Sequence, Union, Set
+
+from airflow.exceptions import AirflowException
+
+from airflow.providers.google.cloud.hooks.dataflow import (
+    DataflowHook,
+    DEFAULT_DATAFLOW_LOCATION,
+    DataflowJobStatus,
+)
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataflowJobStatusSensor(BaseSensorOperator):
+    """
+    Checks for the status of a job in Google Dataflow.

Review comment:
       Nit: branding
   
   ```suggestion
       Checks for the status of a job in Google Cloud Dataflow.
   ```

##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""

Review comment:
       ```suggestion
   """This module contains a Google Cloud Dataflow sensor."""
   ```

##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -324,6 +344,23 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
             `https://cloud.google.com/dataflow/pipelines/specifying-exec-params
             <https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment>`__
     :type environment: Optional[dict]
+    :param wait_until_finished: (Optional)
+        If True, wait for the end of pipeline execution before exiting. If False,
+        it only waits for it to starts (``JOB_STATE_RUNNING``).
+
+        The default behavior depends on the type of pipeline:
+
+        * for the streaming pipeline, wait for jobs to start,
+        * for the batch pipeline, wait for the jobs to complete.
+
+        .. warning::
+
+            You cannot call ``PipelineResult.wait_until_finish`` method in your pipeline code for the operator

Review comment:
       Note that `wait_until_finish` is called implicitly by `with Pipeline() as p:` as well. Since `with` is the recommended way to run pipelines, I am hesitant about this part. Especially since the user is likely to have not even written the template's pipeline code themselves.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#issuecomment-724259128


   [The Workflow run](https://github.com/apache/airflow/actions/runs/354679044) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #11726: Add DataflowJobStatusSensor

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r509663487



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -268,9 +271,8 @@ def _check_dataflow_job_state(self, job) -> bool:
             raise Exception("Google Cloud Dataflow job {} has failed.".format(job['name']))
         elif DataflowJobStatus.JOB_STATE_CANCELLED == job['currentState']:
             raise Exception("Google Cloud Dataflow job {} was cancelled.".format(job['name']))
-        elif (
-            DataflowJobStatus.JOB_STATE_RUNNING == job['currentState']
-            and DataflowJobType.JOB_TYPE_STREAMING == job['type']
+        elif DataflowJobStatus.JOB_STATE_RUNNING == job['currentState'] and (

Review comment:
       This is helpful as the user will be able to start the streaming job and then stop it from the Web UI.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#issuecomment-721654000


   [The Workflow run](https://github.com/apache/airflow/actions/runs/345353094) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r517235878



##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""
+from typing import Optional, Sequence, Union, Set
+
+from airflow.exceptions import AirflowException
+
+from airflow.providers.google.cloud.hooks.dataflow import (
+    DataflowHook,
+    DEFAULT_DATAFLOW_LOCATION,
+    DataflowJobStatus,
+)
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataflowJobStatusSensor(BaseSensorOperator):
+    """
+    Checks for the status of a job in Google Dataflow.

Review comment:
       Thanks! Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed edited a comment on pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed edited a comment on pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#issuecomment-727619991


   thanks @aaltay and @potiuk :)
   rebased on the latest master


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #11726: Add DataflowJobStatusSensor

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r509662589



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -268,9 +271,8 @@ def _check_dataflow_job_state(self, job) -> bool:
             raise Exception("Google Cloud Dataflow job {} has failed.".format(job['name']))
         elif DataflowJobStatus.JOB_STATE_CANCELLED == job['currentState']:
             raise Exception("Google Cloud Dataflow job {} was cancelled.".format(job['name']))
-        elif (
-            DataflowJobStatus.JOB_STATE_RUNNING == job['currentState']
-            and DataflowJobType.JOB_TYPE_STREAMING == job['type']
+        elif DataflowJobStatus.JOB_STATE_RUNNING == job['currentState'] and (

Review comment:
       I've thought about it a bit longer, and I think we can provide a little more flexibility here if we don't set a single default for wait_until_finish.
   What do you think about implementing the following rules?
   
   - If the user has passed the wait_until_finish parameter, this value is used.
   - If the wait_until_finish parameter is empty and the job is of the streaming type, NOT wait for the finish.
   - If the wait_until_finish parameter is empty and the job is of the batch type, wait for the finish.
   - 
   
   ```python
   if self._wait_until_finish is None:
       wait_until_finish = DataflowJobType.JOB_TYPE_STREAMING != job['type']
   else:
        wait_until_finish = self._wait_until_finish
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r517609877



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -324,6 +344,23 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
             `https://cloud.google.com/dataflow/pipelines/specifying-exec-params
             <https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment>`__
     :type environment: Optional[dict]
+    :param wait_until_finished: (Optional)
+        If True, wait for the end of pipeline execution before exiting. If False,
+        it only waits for it to starts (``JOB_STATE_RUNNING``).
+
+        The default behavior depends on the type of pipeline:
+
+        * for the streaming pipeline, wait for jobs to start,
+        * for the batch pipeline, wait for the jobs to complete.
+
+        .. warning::
+
+            You cannot call ``PipelineResult.wait_until_finish`` method in your pipeline code for the operator

Review comment:
       The process of starting the Datafłow job in Airflow consists of two:
   - running a subprocess and reading the stderr/stderr log for the job id.
   - loop waiting for the end of the job ID from the previous step. This loop checks the status of the job.
   
   Step two is started just after step one has finished, so if you have `wait_until_finished` in your pipeline code, step two will not start until the process stops. When this process stops, steps two will run, but it will only execute one iteration as the job will be in a terminal state.
   
   If you in your pipeline do not call the `wait_for_pipeline` method but pass `wait_until_finish =True` to the operator,  the second loop will wait for the job's terminal state.
   
   If you in your pipeline do not call the `wait_for_pipeline` method, and pass `wait_until_finish =False` to the operator, the second loop will wait for the running state only. 
   
   
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r522316939



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -278,18 +296,20 @@ def _check_dataflow_job_state(self, job) -> bool:
         :rtype: bool
         :raise: Exception
         """
-        if DataflowJobStatus.JOB_STATE_DONE == job["currentState"]:
+        if self._wait_until_finished is None:
+            wait_until_finished = job['type'] != DataflowJobType.JOB_TYPE_STREAMING
+        else:
+            wait_until_finished = self._wait_until_finished
+
+        if job['currentState'] == DataflowJobStatus.JOB_STATE_DONE:

Review comment:
       Thanks! Just added it here:  https://github.com/apache/airflow/pull/11726/commits/d590f9221967b73d43780d9aeb35cd96b2d2ca28




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #11726: Add DataflowJobStatusSensor

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r509662589



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -268,9 +271,8 @@ def _check_dataflow_job_state(self, job) -> bool:
             raise Exception("Google Cloud Dataflow job {} has failed.".format(job['name']))
         elif DataflowJobStatus.JOB_STATE_CANCELLED == job['currentState']:
             raise Exception("Google Cloud Dataflow job {} was cancelled.".format(job['name']))
-        elif (
-            DataflowJobStatus.JOB_STATE_RUNNING == job['currentState']
-            and DataflowJobType.JOB_TYPE_STREAMING == job['type']
+        elif DataflowJobStatus.JOB_STATE_RUNNING == job['currentState'] and (

Review comment:
       I've thought about it a bit longer, and I think we can provide a little more flexibility here if we don't set a single default for wait_until_finish.
   What do you think about implementing the following rules?
   
   - If the user has passed the wait_until_finish parameter, this value is used.
   - If the wait_until_finish parameter is empty and the job is of the streaming type, NOT wait for the finish.
   - If the wait_until_finish parameter is empty and the job is of the batch type, wait for the finish.
   
   ```python
   if self._wait_until_finish is None:
       wait_until_finish = DataflowJobType.JOB_TYPE_STREAMING != job['type']
   else:
        wait_until_finish = self._wait_until_finish
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ibzib commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r518877254



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -324,6 +344,23 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
             `https://cloud.google.com/dataflow/pipelines/specifying-exec-params
             <https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment>`__
     :type environment: Optional[dict]
+    :param wait_until_finished: (Optional)
+        If True, wait for the end of pipeline execution before exiting. If False,
+        it only waits for it to starts (``JOB_STATE_RUNNING``).
+
+        The default behavior depends on the type of pipeline:
+
+        * for the streaming pipeline, wait for jobs to start,
+        * for the batch pipeline, wait for the jobs to complete.
+
+        .. warning::
+
+            You cannot call ``PipelineResult.wait_until_finish`` method in your pipeline code for the operator

Review comment:
       Thanks for the explanation @mik-laj. The behaviors you describe seem acceptable.
   
   @TobKed Can you include details like Kamil's comment in the pydocs?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r517380528



##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""
+from typing import Optional, Sequence, Union, Set
+
+from airflow.exceptions import AirflowException
+
+from airflow.providers.google.cloud.hooks.dataflow import (
+    DataflowHook,
+    DEFAULT_DATAFLOW_LOCATION,
+    DataflowJobStatus,
+)
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataflowJobStatusSensor(BaseSensorOperator):
+    """
+    Checks for the status of a job in Google Dataflow.
+
+    :param job_id: ID of the job to be checked.
+    :type job_id: str
+    :param expected_statuses: The expected state of the operation.
+        See:
+        https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
+    :type expected_statuses: Union[Set[str], str]
+    :param project_id: Optional, the Google Cloud project ID in which to start a job.
+        If set to None or missing, the default project_id from the Google Cloud connection is used.
+    :type project_id: str
+    :param location: Job location.

Review comment:
       Added.

##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""
+from typing import Optional, Sequence, Union, Set
+
+from airflow.exceptions import AirflowException
+
+from airflow.providers.google.cloud.hooks.dataflow import (
+    DataflowHook,
+    DEFAULT_DATAFLOW_LOCATION,
+    DataflowJobStatus,
+)
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataflowJobStatusSensor(BaseSensorOperator):
+    """
+    Checks for the status of a job in Google Dataflow.

Review comment:
       Fixed. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#issuecomment-714683077


   PTAL @mik-laj , I made changes you suggested and some additional.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#issuecomment-727619991


   thanks @potiuk :)
   rebased on the latest master


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r517235606



##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""

Review comment:
       Thanks! Fixed.

##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -266,16 +283,18 @@ def _check_dataflow_job_state(self, job) -> bool:
         :rtype: bool
         :raise: Exception
         """
+        if self._wait_until_finished is None:
+            wait_until_finished = DataflowJobType.JOB_TYPE_STREAMING != job['type']

Review comment:
       Thanks! Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r520091367



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -324,6 +344,23 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
             `https://cloud.google.com/dataflow/pipelines/specifying-exec-params
             <https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment>`__
     :type environment: Optional[dict]
+    :param wait_until_finished: (Optional)
+        If True, wait for the end of pipeline execution before exiting. If False,
+        it only waits for it to starts (``JOB_STATE_RUNNING``).
+
+        The default behavior depends on the type of pipeline:
+
+        * for the streaming pipeline, wait for jobs to start,
+        * for the batch pipeline, wait for the jobs to complete.
+
+        .. warning::
+
+            You cannot call ``PipelineResult.wait_until_finish`` method in your pipeline code for the operator

Review comment:
       Thanks @mik-laj 
   @ibzib sure, just pushed fixup with improved docs :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #11726: Add DataflowJobStatusSensor

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r509568774



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -905,3 +910,30 @@ def cancel_job(
             poll_sleep=self.poll_sleep,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_job(
+        self,
+        job_id: str,
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+    ) -> dict:
+        """
+        Gets the job with the specified Job ID.
+
+        :param job_id: Job ID to get.
+        :type job_id: str
+        :param project_id: Optional, the Google Cloud project ID in which to start a job.
+            If set to None or missing, the default project_id from the Google Cloud connection is used.
+        :type project_id:
+        :param location: Job location.
+        :type location: str
+        :return: the Job
+        :rtype: dict
+        """
+        jobs_controller = _DataflowJobsController(
+            dataflow=self.get_conn(),
+            project_number=project_id,
+            location=location,
+        )
+        return jobs_controller._fetch_job_by_id(job_id)  # pylint: disable=protected-access

Review comment:
       ```suggestion
           return jobs_controller.fetch_job_by_id(job_id)  # pylint: disable=protected-access
   ```
   This method should be marked public because it is used by an external class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#issuecomment-726987222


   Needs rebase and I think we can include it in the next release of backport providers (which I am planning to make release candidate of on Monday)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#issuecomment-724712256


   Thanks @ibzib :)
   I rebased on the latest master
   cc @mik-laj 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r510332168



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -268,9 +271,8 @@ def _check_dataflow_job_state(self, job) -> bool:
             raise Exception("Google Cloud Dataflow job {} has failed.".format(job['name']))
         elif DataflowJobStatus.JOB_STATE_CANCELLED == job['currentState']:
             raise Exception("Google Cloud Dataflow job {} was cancelled.".format(job['name']))
-        elif (
-            DataflowJobStatus.JOB_STATE_RUNNING == job['currentState']
-            and DataflowJobType.JOB_TYPE_STREAMING == job['type']
+        elif DataflowJobStatus.JOB_STATE_RUNNING == job['currentState'] and (

Review comment:
       If I understand correctly, this allow to preserve backward compatibility and give great options of configuration to the user. Awesome.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r517235953



##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""
+from typing import Optional, Sequence, Union, Set
+
+from airflow.exceptions import AirflowException
+
+from airflow.providers.google.cloud.hooks.dataflow import (
+    DataflowHook,
+    DEFAULT_DATAFLOW_LOCATION,
+    DataflowJobStatus,
+)
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataflowJobStatusSensor(BaseSensorOperator):
+    """
+    Checks for the status of a job in Google Dataflow.
+
+    :param job_id: ID of the job to be checked.
+    :type job_id: str
+    :param expected_statuses: The expected state of the operation.
+        See:
+        https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
+    :type expected_statuses: Union[Set[str], str]
+    :param project_id: Optional, the Google Cloud project ID in which to start a job.
+        If set to None or missing, the default project_id from the Google Cloud connection is used.
+    :type project_id: str
+    :param location: Job location.
+    :type location: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,

Review comment:
       Thanks! Added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r517235407



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -920,3 +942,30 @@ def cancel_job(
             drain_pipeline=self.drain_pipeline,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_job(
+        self,
+        job_id: str,
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+    ) -> dict:
+        """
+        Gets the job with the specified Job ID.
+
+        :param job_id: Job ID to get.
+        :type job_id: str
+        :param project_id: Optional, the Google Cloud project ID in which to start a job.
+            If set to None or missing, the default project_id from the Google Cloud connection is used.
+        :type project_id:
+        :param location: Job location.

Review comment:
       Thanks! Added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r521835489



##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Cloud Dataflow sensor."""
+from typing import Optional, Sequence, Set, Union
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.cloud.hooks.dataflow import (
+    DEFAULT_DATAFLOW_LOCATION,
+    DataflowHook,
+    DataflowJobStatus,
+)
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataflowJobStatusSensor(BaseSensorOperator):
+    """
+    Checks for the status of a job in Google Cloud Dataflow.
+
+    :param job_id: ID of the job to be checked.
+    :type job_id: str
+    :param expected_statuses: The expected state of the operation.
+        See:
+        https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
+    :type expected_statuses: Union[Set[str], str]
+    :param project_id: Optional, the Google Cloud project ID in which to start a job.
+        If set to None or missing, the default project_id from the Google Cloud connection is used.
+    :type project_id: str
+    :param location: The location of the Dataflow job (for example europe-west1). See:
+        https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
+    :type location: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled. See:
+        https://developers.google.com/identity/protocols/oauth2/service-account#delegatingauthority
+    :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ['job_id']
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        job_id: str,
+        expected_statuses: Union[Set[str], str],
+        project_id: Optional[str] = None,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_id = job_id
+        self.expected_statuses = (
+            {expected_statuses} if isinstance(expected_statuses, str) else expected_statuses
+        )
+        self.project_id = project_id
+        self.location = location
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        self.hook: Optional[DataflowHook] = None
+
+    def poke(self, context: dict) -> bool:

Review comment:
       Sensors are a native Airflow feature and the user has the ability to influence his behavior. One way is the `poke_interval` argument, which is set to 60 seconds by default. 
   
   https://airflow.readthedocs.io/en/latest/_api/airflow/sensors/base_sensor_operator/index.html?highlight=base_sensor_operator#airflow.sensors.base_sensor_operator.BaseSensorOperator




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r517399496



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -324,6 +344,23 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
             `https://cloud.google.com/dataflow/pipelines/specifying-exec-params
             <https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment>`__
     :type environment: Optional[dict]
+    :param wait_until_finished: (Optional)
+        If True, wait for the end of pipeline execution before exiting. If False,
+        it only waits for it to starts (``JOB_STATE_RUNNING``).
+
+        The default behavior depends on the type of pipeline:
+
+        * for the streaming pipeline, wait for jobs to start,
+        * for the batch pipeline, wait for the jobs to complete.
+
+        .. warning::
+
+            You cannot call ``PipelineResult.wait_until_finish`` method in your pipeline code for the operator

Review comment:
       Partially I agree, but partially not.
   Default behaviour is:
    -  for the streaming pipeline, wait for jobs to start,
     - for the batch pipeline, wait for the jobs to complete.
   
   But there may be the specific cases like:
   
   - user doesn't want to wait for the end of the batch job and knows for sure that templated batch job not `wait_until_finish`
   - user want to wait until streaming job will be cancelled/drained (e.g by some external api call or web UI)
   
   It will give possibility to conscious dataflow users for more flexible DAGs if needed. What do you think about it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r517314642



##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow.py
##########
@@ -127,6 +130,40 @@
         py_interpreter='python3',
         py_system_site_packages=False,
     )
+    # [END howto_operator_start_python_job]
+
+
+with models.DAG(
+    "example_gcp_dataflow_native_python_async",
+    default_args=default_args,
+    start_date=days_ago(1),
+    schedule_interval=None,  # Override to match your needs
+    tags=['example'],
+) as dag_native_python_async:
+    start_python_job_async = DataflowCreatePythonJobOperator(
+        task_id="start-python-job-async",
+        py_file=GCS_PYTHON,
+        py_options=[],
+        job_name='{{task.task_id}}',
+        options={
+            'output': GCS_OUTPUT,
+        },
+        py_requirements=['apache-beam[gcp]==2.21.0'],

Review comment:
       I updated it to the latest version (`2.25.0`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #11726:
URL: https://github.com/apache/airflow/pull/11726


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#issuecomment-720672650


   The PR should be OK to be merged with just subset of tests as it does not modify Core of Airflow. The committers might merge it or can add a label 'full tests needed' and re-run it to run all tests if they see it is needed!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ibzib commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r517596127



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -324,6 +344,23 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
             `https://cloud.google.com/dataflow/pipelines/specifying-exec-params
             <https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment>`__
     :type environment: Optional[dict]
+    :param wait_until_finished: (Optional)
+        If True, wait for the end of pipeline execution before exiting. If False,
+        it only waits for it to starts (``JOB_STATE_RUNNING``).
+
+        The default behavior depends on the type of pipeline:
+
+        * for the streaming pipeline, wait for jobs to start,
+        * for the batch pipeline, wait for the jobs to complete.
+
+        .. warning::
+
+            You cannot call ``PipelineResult.wait_until_finish`` method in your pipeline code for the operator

Review comment:
       I am still confused about this part: `You cannot call ``PipelineResult.wait_until_finish`` method in your pipeline code for the operator to work properly.` What exactly will happen if `pipeline.wait_until_finish()` is called and the `wait_until_finished` param is set?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r517393241



##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -324,6 +344,23 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
             `https://cloud.google.com/dataflow/pipelines/specifying-exec-params
             <https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment>`__
     :type environment: Optional[dict]
+    :param wait_until_finished: (Optional)
+        If True, wait for the end of pipeline execution before exiting. If False,
+        it only waits for it to starts (``JOB_STATE_RUNNING``).
+
+        The default behavior depends on the type of pipeline:
+
+        * for the streaming pipeline, wait for jobs to start,
+        * for the batch pipeline, wait for the jobs to complete.
+
+        .. warning::
+
+            You cannot call ``PipelineResult.wait_until_finish`` method in your pipeline code for the operator
+            to work properly. i. e. you must use asynchronous execution. Otherwise, your pipeline will
+            always wait until finished. For more information, look at:

Review comment:
       `always wait until finished` means that task will wait for the terminal state of the job. It is the current behaviour for batch jobs and it is kept as default behaviour for backward compatibility. But there may be a case that user doesn't want to wait (after successful job start) for the end of the batch job but go further in the DAG.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r509934863



##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""
+from typing import Optional, Sequence, Union, Set
+
+from airflow.exceptions import AirflowException
+
+from airflow.providers.google.cloud.hooks.dataflow import (
+    DataflowHook,
+    DEFAULT_DATAFLOW_LOCATION,
+    DataflowJobStatus,
+)
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataflowJobStatusSensor(BaseSensorOperator):
+    """
+    Checks for the status of a job in Google Dataflow.
+
+    :param job_id: ID of the job to be checked.
+    :type job_id: str
+    :param expected_statuses: The expected state of the operation.
+        See:
+        https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
+    :type expected_statuses: Union[Set[str], str]
+    :param project_id: Optional, the Google Cloud project ID in which to start a job.
+        If set to None or missing, the default project_id from the Google Cloud connection is used.
+    :type project_id: str
+    :param location: Job location.
+    :type location: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ['job_id']
+
+    @apply_defaults
+    def __init__(

Review comment:
       nit: params order in init, docstring and variables is not the same. ;)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#issuecomment-721718263


   [The Workflow run](https://github.com/apache/airflow/actions/runs/345559772) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] TobKed commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
TobKed commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r517381893



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -920,3 +942,30 @@ def cancel_job(
             drain_pipeline=self.drain_pipeline,
         )
         jobs_controller.cancel()
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_job(
+        self,
+        job_id: str,
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+    ) -> dict:
+        """
+        Gets the job with the specified Job ID.
+
+        :param job_id: Job ID to get.
+        :type job_id: str
+        :param project_id: Optional, the Google Cloud project ID in which to start a job.
+            If set to None or missing, the default project_id from the Google Cloud connection is used.
+        :type project_id:
+        :param location: Job location.

Review comment:
       Added information, example value and link. Thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#issuecomment-721445181


   Can you please rebase your PR on latest Master since we have applied [Black](https://github.com/apache/airflow/commit/4e8f9cc8d02b29c325b8a5a76b4837671bdf5f68) and [PyUpgrade](https://github.com/apache/airflow/commit/8c42cf1b00c90f0d7f11b8a3a455381de8e003c5) on Master.
   
   It will help if your squash your commits into single commit first so that there are less conflicts.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org