You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/09 15:32:17 UTC

[GitHub] [airflow] vinitpayal opened a new pull request, #25622: Google Cloud Tasks Sensor for queue being empty

vinitpayal opened a new pull request, #25622:
URL: https://github.com/apache/airflow/pull/25622

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vinitpayal commented on pull request #25622: Google Cloud Tasks Sensor for queue being empty

Posted by GitBox <gi...@apache.org>.
vinitpayal commented on PR #25622:
URL: https://github.com/apache/airflow/pull/25622#issuecomment-1213816507

   @potiuk please let me know if need any other help in getting it merged? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #25622: Google Cloud Tasks Sensor for queue being empty

Posted by GitBox <gi...@apache.org>.
pankajastro commented on code in PR #25622:
URL: https://github.com/apache/airflow/pull/25622#discussion_r950628969


##########
airflow/providers/google/cloud/example_dags/example_cloud_task.py:
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that sense a cloud task queue being empty.
+
+This DAG relies on the following OS environment variables
+
+* GCP_PROJECT_ID - Google Cloud project where the Compute Engine instance exists.
+* GCP_ZONE - Google Cloud zone where the cloud task queue exists.
+* QUEUE_NAME - Name of the cloud task queue.
+"""
+
+import os
+from datetime import datetime
+
+from airflow import models
+from airflow.providers.google.cloud.sensors.tasks import TaskQueueEmptySensor
+
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
+QUEUE_NAME = os.environ.get('GCP_QUEUE_NAME', 'testqueue')
+
+
+with models.DAG(
+    'example_gcp_cloud_tasks_sensor',
+    start_date=datetime(2022, 8, 8),
+    catchup=False,
+    tags=['example'],
+) as dag:
+    # start = EmptyOperator(task_id="start")

Review Comment:
   ```suggestion
   ```



##########
airflow/providers/google/cloud/example_dags/example_cloud_task.py:
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that sense a cloud task queue being empty.
+
+This DAG relies on the following OS environment variables
+
+* GCP_PROJECT_ID - Google Cloud project where the Compute Engine instance exists.
+* GCP_ZONE - Google Cloud zone where the cloud task queue exists.
+* QUEUE_NAME - Name of the cloud task queue.
+"""
+
+import os
+from datetime import datetime
+
+from airflow import models
+from airflow.providers.google.cloud.sensors.tasks import TaskQueueEmptySensor
+
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
+QUEUE_NAME = os.environ.get('GCP_QUEUE_NAME', 'testqueue')
+
+
+with models.DAG(
+    'example_gcp_cloud_tasks_sensor',
+    start_date=datetime(2022, 8, 8),
+    catchup=False,
+    tags=['example'],
+) as dag:
+    # start = EmptyOperator(task_id="start")
+    # [START cloud_tasks_empty_sensor]
+    gcp_cloud_tasks_sensor = TaskQueueEmptySensor(
+        project_id=GCP_PROJECT_ID,
+        location=GCP_ZONE,
+        task_id='gcp_sense_cloud_tasks_empty',
+        queue_name=QUEUE_NAME,
+    )
+    # [END cloud_tasks_empty_sensor]
+    # end = EmptyOperator(task_id="end")

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vinitpayal commented on pull request #25622: Google Cloud Tasks Sensor for queue being empty

Posted by GitBox <gi...@apache.org>.
vinitpayal commented on PR #25622:
URL: https://github.com/apache/airflow/pull/25622#issuecomment-1221297600

   Thanks @pankajastro, suggestions well noted and applied.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vinit-tribes commented on pull request #25622: Google Cloud Tasks Sensor for queue being empty

Posted by GitBox <gi...@apache.org>.
vinit-tribes commented on PR #25622:
URL: https://github.com/apache/airflow/pull/25622#issuecomment-1210183873

   Sorry wasn't aware of the process for adding documentation & added it now. 
   
   @mik-laj @potiuk let me know if made any mistakes in documenting addition as the format was a bit new to me. Otherwise, I think the PR is ready to be merged now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25622: Google Cloud Tasks Sensor for queue being empty

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25622:
URL: https://github.com/apache/airflow/pull/25622#issuecomment-1209558128

   Needs doc update, provider update, examples.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #25622: Google Cloud Tasks Sensor for queue being empty

Posted by GitBox <gi...@apache.org>.
pankajastro commented on code in PR #25622:
URL: https://github.com/apache/airflow/pull/25622#discussion_r950629022


##########
airflow/providers/google/cloud/sensors/tasks.py:
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Cloud Task sensor."""
+from typing import TYPE_CHECKING, Optional, Sequence, Union
+
+from airflow.providers.google.cloud.hooks.tasks import CloudTasksHook
+from airflow.sensors.base import BaseSensorOperator
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class TaskQueueEmptySensor(BaseSensorOperator):
+    """Pulls tasks count from a cloud task queue.

Review Comment:
   ```suggestion
       """
       Pulls tasks count from a cloud task queue.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #25622: Google Cloud Tasks Sensor for queue being empty

Posted by GitBox <gi...@apache.org>.
pankajastro commented on code in PR #25622:
URL: https://github.com/apache/airflow/pull/25622#discussion_r950629053


##########
airflow/providers/google/cloud/sensors/tasks.py:
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Cloud Task sensor."""
+from typing import TYPE_CHECKING, Optional, Sequence, Union
+
+from airflow.providers.google.cloud.hooks.tasks import CloudTasksHook
+from airflow.sensors.base import BaseSensorOperator
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class TaskQueueEmptySensor(BaseSensorOperator):
+    """Pulls tasks count from a cloud task queue.
+    Always waits for queue returning tasks count as 0.
+
+    :param project_id: the Google Cloud project ID for the subscription (templated)
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param queue_name: The queue name to for which task empty sensing is required.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "location",
+        "queue_name",
+        "gcp_conn_id",
+        "impersonation_chain",
+    )
+
+    def __init__(
+        self,
+        *,
+        location: str,
+        project_id: Optional[str] = None,
+        queue_name: Optional[str] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.location = location
+        self.project_id = project_id
+        self.queue_name = queue_name
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def poke(self, context: "Context") -> bool:
+        # self.log.info('Sensor checks existence of tasks in, %s', self.queue_name)

Review Comment:
   ```suggestion
   ```



##########
airflow/providers/google/cloud/sensors/tasks.py:
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Cloud Task sensor."""
+from typing import TYPE_CHECKING, Optional, Sequence, Union
+
+from airflow.providers.google.cloud.hooks.tasks import CloudTasksHook
+from airflow.sensors.base import BaseSensorOperator
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class TaskQueueEmptySensor(BaseSensorOperator):
+    """Pulls tasks count from a cloud task queue.
+    Always waits for queue returning tasks count as 0.
+
+    :param project_id: the Google Cloud project ID for the subscription (templated)
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param queue_name: The queue name to for which task empty sensing is required.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "location",
+        "queue_name",
+        "gcp_conn_id",
+        "impersonation_chain",
+    )
+
+    def __init__(
+        self,
+        *,
+        location: str,
+        project_id: Optional[str] = None,
+        queue_name: Optional[str] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.location = location
+        self.project_id = project_id
+        self.queue_name = queue_name
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def poke(self, context: "Context") -> bool:
+        # self.log.info('Sensor checks existence of tasks in, %s', self.queue_name)
+
+        hook = CloudTasksHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        # TODO uncomment page_size once https://issuetracker.google.com/issues/155978649?pli=1 gets fixed

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #25622: Google Cloud Tasks Sensor for queue being empty

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #25622:
URL: https://github.com/apache/airflow/pull/25622


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org