You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/14 18:18:28 UTC

[GitHub] [airflow] alekseiloginov opened a new pull request #20882: [WIP] Add Looker PDT operators

alekseiloginov opened a new pull request #20882:
URL: https://github.com/apache/airflow/pull/20882


   WIP: Add Looker PDTs operator
   
   This PR adds new Airflow operators that make use of the upcoming Looker PDT Start/Stop API that will be released soon. These operators allow to manage materialization of Looker PDTs via Looker API. Currently supported operations are:
   - Start PDT build
   - Check PDT build status
   - Stop PDT build
   
   This is still work in progress, so the PR is currently in the draft mode.
   
   TODO: update the description.
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r818210161



##########
File path: docs/apache-airflow-providers-google/operators/cloud/looker.rst
##########
@@ -39,29 +39,7 @@ To use these operators, you must do a few things:
 
 Detailed information is available for :doc:`Installation <apache-airflow:installation/index>`.
 
-Communication between Airflow and Looker is done via `Looker API <https://docs.looker.com/reference/api-and-integration/api-reference/v4.0>`_.
-To facilitate the API communication Looker operators use `Looker SDK <https://pypi.org/project/looker-sdk/>`_ as an API client.
-Before calling API, Looker SDK needs to authenticate itself using your Looker API credentials.
-
-* Obtain your Looker API credentials using instructions in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-auth#authentication_with_an_sdk>`_.
-
-* Obtain your Looker API path and port as described in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-getting-started#looker_api_path_and_port>`_.
-
-* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection`
-
-See the following example of a connection setup:
-
-.. code-block::
-
-  Connection Id: your_conn_id  # Passed to operators as ``looker_conn_id`` parameter.
-  Connection Type: HTTP
-  Host: https://your.looker.com  # Base URL for Looker API. Do not include /api/* in the URL.
-  Login: YourClientID  # Looker API client id
-  Password: YourClientSecret  # Looker API client secret
-  Port: 19999  # Port for Looker API. If hosted on GCP, don't specify the port leaving just the host.
-  # Extras are optional parameters in JSON format.
-  Extra: {"verify_ssl": "true",  # Set to false only if testing locally against self-signed certs. Defaults to true if not specified.
-          "timeout": "120"}  # Timeout in seconds for HTTP requests. Defaults to 2 minutes (120) seconds if not specified.
+* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection` and :doc:`apache-airflow-providers-google:connections/gcp_looker`

Review comment:
       Done! ✅




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817920326



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+# TODO: Temporary import for Looker API response stub, remove once looker sdk 22.2 released
+import attr
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+# TODO: Temporary import for Looker API response stub, remove once looker sdk 22.2 released
+from looker_sdk.rtl.model import Model
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+# TODO: uncomment once looker sdk 22.2 released
+# from looker_sdk.sdk.api40.models import MaterializePDT
+
+
+@attr.s(auto_attribs=True, init=False)
+class MaterializePDT(Model):
+    """
+    TODO: Temporary API response stub. Use Looker API class once looker sdk 22.2 released.
+    Attributes:
+    materialization_id: The ID of the enqueued materialization job, if any
+    resp_text: The string response
+    """
+
+    materialization_id: str
+    resp_text: Optional[str] = None
+
+    def __init__(self, *, materialization_id: str, resp_text: Optional[str] = None):
+        self.materialization_id = materialization_id
+        self.resp_text = resp_text
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+    ) -> None:
+        super().__init__()
+        self.looker_conn_id = looker_conn_id
+        self.source = self.get_api_source()
+
+    def start_pdt_build(
+        self,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+    ) -> MaterializePDT:
+        """
+        Submits a PDT materialization job to Looker.
+
+        :param model: Required. The model of the PDT to start building.
+        :param view: Required. The view of the PDT to start building.
+        :param query_params: Optional. Additional materialization parameters.
+        """
+        self.log.info("Submitting PDT materialization job. Model: '%s', view: '%s'.", model, view)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        looker_ver = sdk.versions().looker_release_version
+        if parse_version(looker_ver) < parse_version("22.2.0"):
+            raise AirflowException(f'This API requires Looker version 22.2+. Found: {looker_ver}.')
+
+        # TODO: Temporary API response stub. Use Looker API once looker sdk 22.2 released.
+        resp = MaterializePDT(materialization_id='366', resp_text=None)
+        # unpack query_params dict into kwargs (if not None)
+        # if query_params:
+        #     resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source, **query_params)
+        # else:
+        #     resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source)
+
+        self.log.info("Start PDT build response: '%s'.", resp)
+
+        return resp
+
+    def check_pdt_build(
+        self,
+        materialization_id: str,
+    ) -> MaterializePDT:
+        """
+        Gets the PDT materialization job status from Looker.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        self.log.info("Requesting PDT materialization job status. Job id: %s.", materialization_id)
+
+        # TODO: Temporary API response stub. Use Looker API once looker sdk 22.2 released.
+        resp = MaterializePDT(
+            materialization_id='366',
+            resp_text='{"status":"running","runtime":null,'
+            '"message":"Materialize 100m_sum_aggregate_sdt, ",'
+            '"task_slug":"f522424e00f0039a8c8f2d53d773f310"}',
+        )
+        sdk = self.get_looker_sdk()  # NOQA
+        # resp = sdk.check_pdt_build(materialization_id=materialization_id)
+
+        self.log.info("Check PDT build response: '%s'.", resp)
+        return resp
+
+    def pdt_build_status(
+        self,
+        materialization_id: str,
+    ) -> Dict:
+        """
+        Gets the PDT materialization job status.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        resp = self.check_pdt_build(materialization_id=materialization_id)
+
+        status_json = resp['resp_text']
+        status_dict = json.loads(status_json)
+
+        self.log.info(
+            "PDT materialization job id: %s. Status: '%s'.", materialization_id, status_dict['status']
+        )
+
+        return status_dict
+
+    def stop_pdt_build(
+        self,
+        materialization_id: str,
+    ) -> MaterializePDT:
+        """
+        Starts a PDT materialization job cancellation request.
+
+        :param materialization_id: Required. The materialization id to stop.
+        """
+        self.log.info("Stopping PDT materialization. Job id: %s.", materialization_id)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        # TODO: Temporary API response stub. Use Looker API once looker sdk 22.2 released.
+        resp = MaterializePDT(
+            materialization_id='366',
+            resp_text='{"success":true,"connection":"incremental_pdts_test",'
+            '"statusment":"KILL 810852",'
+            '"task_slug":"6bad8184f94407134251be8fd18af834"}',
+        )

Review comment:
       same here - I removed this completely as looker sdk 22.2 is finally launched now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817942058



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(

Review comment:
       Yes. I mean this dropdown list.
   I think you should create a new connection type and then add a new field in the connection form (`timeout`, `veriify_ssl`).
   https://github.com/apache/airflow/blob/3f9295a4f4f478aaeab4f6ac44a8a9a57d428d22/airflow/providers/snowflake/hooks/snowflake.py#L79-L132
   This will also allow you to rename the "Login" field to "Client ID" and the "Password" field to "Client Secret".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r818072599



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(

Review comment:
       In the future, users will have to consciously migrate to the new authentication method, which is likely to take some time. As part of the migration, we can start accepting GCP connections and add "(Deprecated)" to the name of the old connection type. I don't see why we should now make it difficult to configure a connection by not adding this connection to the UI now.
   
   If you don't want to add the ability to easily create connections via UI, we can limit ourselves to documentation only, but we should make sure that everything is precisely described.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1060791616






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #20882:
URL: https://github.com/apache/airflow/pull/20882


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817118840



##########
File path: docs/apache-airflow-providers-google/operators/cloud/looker.rst
##########
@@ -0,0 +1,93 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+Google Cloud Looker Operators
+===============================
+
+Looker is a business intelligence software and big data analytics platform that
+helps you explore, analyze and share real-time business analytics easily.
+
+Looker has a Public API and associated SDK clients in different languages,
+which allow programmatic access to the Looker data platform.
+
+For more information visit `Looker API documentation <https://docs.looker.com/reference/api-and-integration>`_.
+
+Prerequisite Tasks
+------------------
+
+To use these operators, you must do a few things:
+
+* Install API libraries via **pip**.
+
+.. code-block:: bash
+
+  pip install 'apache-airflow[google]'
+
+Detailed information is available for :doc:`Installation <apache-airflow:installation/index>`.
+
+Communication between Airflow and Looker is done via `Looker API <https://docs.looker.com/reference/api-and-integration/api-reference/v4.0>`_.
+To facilitate the API communication Looker operators use `Looker SDK <https://pypi.org/project/looker-sdk/>`_ as an API client.
+Before calling API, Looker SDK needs to authenticate itself using your Looker API credentials.
+
+* Obtain your Looker API credentials using instructions in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-auth#authentication_with_an_sdk>`_.
+
+* Obtain your Looker API path and port as described in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-getting-started#looker_api_path_and_port>`_.
+
+* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection`
+
+See the following example of a connection setup:

Review comment:
       Got it! I think I was under impression that only connections that have their own `Connection Type` goes there, and our connection type is `HTTP`. Sure, I can create a new connection called something like `Google Cloud Looker Connection` there. Will also add a URL configuration example.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r818180224



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+    ) -> None:
+        super().__init__()
+        self.looker_conn_id = looker_conn_id
+        # source is used to track origin of the requests
+        self.source = f'airflow:{version}'
+
+    def start_pdt_build(
+        self,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+    ):
+        """
+        Submits a PDT materialization job to Looker.
+
+        :param model: Required. The model of the PDT to start building.
+        :param view: Required. The view of the PDT to start building.
+        :param query_params: Optional. Additional materialization parameters.
+        """
+        self.log.info("Submitting PDT materialization job. Model: '%s', view: '%s'.", model, view)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        looker_ver = sdk.versions().looker_release_version
+        if parse_version(looker_ver) < parse_version("22.2.0"):
+            raise AirflowException(f'This API requires Looker version 22.2+. Found: {looker_ver}.')
+
+        # unpack query_params dict into kwargs (if not None)
+        if query_params:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source, **query_params)
+        else:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source)
+
+        self.log.info("Start PDT build response: '%s'.", resp)
+
+        return resp
+
+    def check_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Gets the PDT materialization job status from Looker.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        self.log.info("Requesting PDT materialization job status. Job id: %s.", materialization_id)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.check_pdt_build(materialization_id=materialization_id)
+
+        self.log.info("Check PDT build response: '%s'.", resp)
+        return resp
+
+    def pdt_build_status(
+        self,
+        materialization_id: str,
+    ) -> Dict:
+        """
+        Gets the PDT materialization job status.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        resp = self.check_pdt_build(materialization_id=materialization_id)
+
+        status_json = resp['resp_text']
+        status_dict = json.loads(status_json)
+
+        self.log.info(
+            "PDT materialization job id: %s. Status: '%s'.", materialization_id, status_dict['status']
+        )
+
+        return status_dict
+
+    def stop_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Starts a PDT materialization job cancellation request.
+
+        :param materialization_id: Required. The materialization id to stop.
+        """
+        self.log.info("Stopping PDT materialization. Job id: %s.", materialization_id)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.stop_pdt_build(materialization_id=materialization_id, source=self.source)
+
+        self.log.info("Stop PDT build response: '%s'.", resp)
+        return resp
+
+    def wait_for_job(
+        self,
+        materialization_id: str,
+        wait_time: int = 10,
+        timeout: Optional[int] = None,
+    ) -> None:
+        """
+        Helper method which polls a PDT materialization job to check if it finishes.
+
+        :param materialization_id: Required. The materialization id to wait for.
+        :param wait_time: Optional. Number of seconds between checks.
+        :param timeout: Optional. How many seconds wait for job to be ready.
+        Used only if ``asynchronous`` is False.
+        """
+        self.log.info('Waiting for PDT materialization job to complete. Job id: %s.', materialization_id)
+
+        status = None
+        start = time.monotonic()
+
+        while status not in (
+            JobStatus.DONE.value,
+            JobStatus.ERROR.value,
+            JobStatus.CANCELLED.value,
+            JobStatus.UNKNOWN.value,
+        ):
+
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(
+                    f"Timeout: PDT materialization job is not ready after {timeout}s. "
+                    f"Job id: {materialization_id}."
+                )
+
+            time.sleep(wait_time)
+
+            status_dict = self.pdt_build_status(materialization_id=materialization_id)
+            status = status_dict['status']
+
+        if status == JobStatus.ERROR.value:
+            msg = status_dict['message']
+            raise AirflowException(
+                f'PDT materialization job failed. Job id: {materialization_id}. Message:\n"{msg}"'
+            )
+        if status == JobStatus.CANCELLED.value:
+            raise AirflowException(f'PDT materialization job was cancelled. Job id: {materialization_id}.')
+        if status == JobStatus.UNKNOWN.value:
+            raise AirflowException(
+                f'PDT materialization job has unknown status. Job id: {materialization_id}.'
+            )
+
+        self.log.info('PDT materialization job completed successfully. Job id: %s.', materialization_id)
+
+    def get_looker_sdk(self):
+        """Returns Looker SDK client for Looker API 4.0."""
+
+        conn = self.get_connection(self.looker_conn_id)
+        settings = LookerApiSettings(conn)
+
+        transport = requests_transport.RequestsTransport.configure(settings)
+        return methods40.Looker40SDK(
+            auth_session.AuthSession(settings, transport, serialize.deserialize40, "4.0"),
+            serialize.deserialize40,
+            serialize.serialize,
+            transport,
+            "4.0",
+        )
+

Review comment:
       I tried to add this, but looks like it only works when the hook is [associated](https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html#test-connections) with the connection type. And since I'd prefer to keep it HTTP for now, we would need to wait to add this feature until we switch to GCP connection type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1057827841


   Unfortunately we can't merge this change - it would restrict the whole google provider from installing on Python 3.9.10, so we cannot merge it until the problem is solved. 
   
   I flagged the issue in Looker SDK : https://github.com/looker-open-source/sdk-codegen/pull/949 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817035826



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+    ) -> None:
+        super().__init__()
+        self.looker_conn_id = looker_conn_id
+        self.source = self.get_api_source()
+
+    def start_pdt_build(
+        self,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+    ):
+        """
+        Submits a PDT materialization job to Looker.
+
+        :param model: Required. The model of the PDT to start building.
+        :param view: Required. The view of the PDT to start building.
+        :param query_params: Optional. Additional materialization parameters.
+        """
+        self.log.info("Submitting PDT materialization job. Model: '%s', view: '%s'.", model, view)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        looker_ver = sdk.versions().looker_release_version
+        if parse_version(looker_ver) < parse_version("22.2.0"):
+            raise AirflowException(f'This API requires Looker version 22.2+. Found: {looker_ver}.')
+
+        # unpack query_params dict into kwargs (if not None)
+        if query_params:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source, **query_params)
+        else:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source)
+
+        self.log.info("Start PDT build response: '%s'.", resp)
+
+        return resp
+
+    def check_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Gets the PDT materialization job status from Looker.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        self.log.info("Requesting PDT materialization job status. Job id: %s.", materialization_id)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.check_pdt_build(materialization_id=materialization_id)
+
+        self.log.info("Check PDT build response: '%s'.", resp)
+        return resp
+
+    def pdt_build_status(
+        self,
+        materialization_id: str,
+    ) -> Dict:
+        """
+        Gets the PDT materialization job status.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        resp = self.check_pdt_build(materialization_id=materialization_id)
+
+        status_json = resp['resp_text']
+        status_dict = json.loads(status_json)
+
+        self.log.info(
+            "PDT materialization job id: %s. Status: '%s'.", materialization_id, status_dict['status']
+        )
+
+        return status_dict
+
+    def stop_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Starts a PDT materialization job cancellation request.
+
+        :param materialization_id: Required. The materialization id to stop.
+        """
+        self.log.info("Stopping PDT materialization. Job id: %s.", materialization_id)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.stop_pdt_build(materialization_id=materialization_id, source=self.source)
+
+        self.log.info("Stop PDT build response: '%s'.", resp)
+        return resp
+
+    def wait_for_job(
+        self,
+        materialization_id: str,
+        wait_time: int = 10,
+        timeout: Optional[int] = None,
+    ) -> None:
+        """
+        Helper method which polls a PDT materialization job to check if it finishes.
+
+        :param materialization_id: Required. The materialization id to wait for.
+        :param wait_time: Optional. Number of seconds between checks.
+        :param timeout: Optional. How many seconds wait for job to be ready.
+        Used only if ``asynchronous`` is False.
+        """
+        self.log.info('Waiting for PDT materialization job to complete. Job id: %s.', materialization_id)
+
+        status = None
+        start = time.monotonic()
+
+        while status not in (
+            JobStatus.DONE.value,
+            JobStatus.ERROR.value,
+            JobStatus.CANCELLED.value,
+            JobStatus.UNKNOWN.value,
+        ):
+
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(
+                    f"Timeout: PDT materialization job is not ready after {timeout}s. "
+                    f"Job id: {materialization_id}."
+                )
+
+            time.sleep(wait_time)
+
+            status_dict = self.pdt_build_status(materialization_id=materialization_id)
+            status = status_dict['status']
+
+        if status == JobStatus.ERROR.value:
+            msg = status_dict['message']
+            raise AirflowException(
+                f'PDT materialization job failed. Job id: {materialization_id}. Message:\n"{msg}"'
+            )
+        if status == JobStatus.CANCELLED.value:
+            raise AirflowException(f'PDT materialization job was cancelled. Job id: {materialization_id}.')
+        if status == JobStatus.UNKNOWN.value:
+            raise AirflowException(
+                f'PDT materialization job has unknown status. Job id: {materialization_id}.'
+            )
+
+        self.log.info('PDT materialization job completed successfully. Job id: %s.', materialization_id)
+
+    def get_api_source(self):
+        """Returns origin of the API call."""
+
+        # Infer API source based on Airflow version, e.g.:
+        # "2.1.4+composer" -> Cloud Composer
+        # "2.1.4" -> Airflow
+
+        self.log.debug(
+            "Got the following version for connection id: '%s'. Version: '%s'.", self.looker_conn_id, version
+        )
+
+        return 'composer' if 'composer' in version else 'airflow'

Review comment:
       I was hoping to achieve that in an effortless way for the users, that's why we decided to check the Airflow's `version` for additional info.
   
   I'm not sure how an env var like `AIRFLOW_SNOWFLAKE_PARTNER` will be used. Does it mean that a user need to specify? If so, is there a ways to identify the type of Airflow (standalone/managed) based in a build-in data?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov edited a comment on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov edited a comment on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1059180977


   Thanks to @joeldodge79, we [unpinned](https://github.com/looker-open-source/sdk-codegen/pull/1015) python 3.9.9 and released new [looker sdk 2.2.1](https://pypi.org/project/looker-sdk/) version for it.
   With this, I will try to rerun CI now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1059180977


   Thanks to @joeldodge79, we [unpinned](https://github.com/looker-open-source/sdk-codegen/pull/1015) python 3.9.9 and released new [looker sdk 2.2.1](https://pypi.org/project/looker-sdk/) version for it.
   With this, I will try to rerun CI for my again now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r819734146



##########
File path: docs/apache-airflow-providers-google/operators/cloud/looker.rst
##########
@@ -39,29 +39,7 @@ To use these operators, you must do a few things:
 
 Detailed information is available for :doc:`Installation <apache-airflow:installation/index>`.
 
-Communication between Airflow and Looker is done via `Looker API <https://docs.looker.com/reference/api-and-integration/api-reference/v4.0>`_.
-To facilitate the API communication Looker operators use `Looker SDK <https://pypi.org/project/looker-sdk/>`_ as an API client.
-Before calling API, Looker SDK needs to authenticate itself using your Looker API credentials.
-
-* Obtain your Looker API credentials using instructions in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-auth#authentication_with_an_sdk>`_.
-
-* Obtain your Looker API path and port as described in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-getting-started#looker_api_path_and_port>`_.
-
-* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection`
-
-See the following example of a connection setup:
-
-.. code-block::
-
-  Connection Id: your_conn_id  # Passed to operators as ``looker_conn_id`` parameter.
-  Connection Type: HTTP
-  Host: https://your.looker.com  # Base URL for Looker API. Do not include /api/* in the URL.
-  Login: YourClientID  # Looker API client id
-  Password: YourClientSecret  # Looker API client secret
-  Port: 19999  # Port for Looker API. If hosted on GCP, don't specify the port leaving just the host.
-  # Extras are optional parameters in JSON format.
-  Extra: {"verify_ssl": "true",  # Set to false only if testing locally against self-signed certs. Defaults to true if not specified.
-          "timeout": "120"}  # Timeout in seconds for HTTP requests. Defaults to 2 minutes (120) seconds if not specified.
+* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection` and :doc:`apache-airflow-providers-google:connections/gcp_looker`

Review comment:
       Hmm, I added
   ```
   :doc:`connections/gcp_looker`
   ```
   but I'm still getting
   ```
   unknown document: connections/gcp_looker
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1059817727


   Some tests failed:
   ```
   ==== Core mysql: 12 failures ====
   
   tests/executors/test_dask_executor.py::TestDaskExecutor::test_backfill_integration: ValueError: Cell is empty
   tests/executors/test_dask_executor.py::TestDaskExecutor::test_dask_executor_functions: ValueError: Cell is empty
   tests/executors/test_dask_executor.py::TestDaskExecutorTLS::test_tls: ValueError: Cell is empty
   tests/executors/test_dask_executor.py::TestDaskExecutorQueue::test_dask_queues: ValueError: Cell is empty
   tests/executors/test_dask_executor.py::TestDaskExecutorQueue::test_dask_queues_no_queue_specified: ValueError: Cell is empty
   tests/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_run_next_exception_requeue[403 Forbidden]: AttributeError: '_patch' object has no attribute 'list_namespaced_pod'
   tests/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_run_next_exception_requeue[12345 fake-unhandled-reason]: AttributeError: '_patch' object has no attribute 'list_namespaced_pod'
   tests/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_run_next_exception_requeue[422 Unprocessable Entity]: AttributeError: '_patch' object has no attribute 'list_namespaced_pod'
   tests/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_run_next_exception_requeue[400 BadRequest]: AttributeError: '_patch' object has no attribute 'list_namespaced_pod'
   tests/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_pod_template_file_override_in_executor_config: AttributeError: '_patch' object has no attribute 'list_namespaced_pod'
   tests/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_pending_pod_timeout: AssertionError: Expected 'list_namespaced_pod' to be called once. Called 6 times.
   tests/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_clear_not_launched_queued_tasks_not_launched: AssertionError: assert 12 == 2
    +  where 12 = <MagicMock name='mock.list_namespaced_pod' id='139977912019472'>.call_count
    +    where <MagicMock name='mock.list_namespaced_pod' id='139977912019472'> = <MagicMock id='139977912117200'>.list_namespaced_pod
   ```
   ```
   ==== Core sqlite: 12 failures ====
   
   tests/executors/test_dask_executor.py::TestDaskExecutor::test_backfill_integration: ValueError: Cell is empty
   tests/executors/test_dask_executor.py::TestDaskExecutor::test_dask_executor_functions: ValueError: Cell is empty
   tests/executors/test_dask_executor.py::TestDaskExecutorTLS::test_tls: ValueError: Cell is empty
   tests/executors/test_dask_executor.py::TestDaskExecutorQueue::test_dask_queues: ValueError: Cell is empty
   tests/executors/test_dask_executor.py::TestDaskExecutorQueue::test_dask_queues_no_queue_specified: ValueError: Cell is empty
   tests/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_run_next_exception_requeue[403 Forbidden]: AttributeError: '_patch' object has no attribute 'list_namespaced_pod'
   tests/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_run_next_exception_requeue[12345 fake-unhandled-reason]: AttributeError: '_patch' object has no attribute 'list_namespaced_pod'
   tests/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_run_next_exception_requeue[422 Unprocessable Entity]: AttributeError: '_patch' object has no attribute 'list_namespaced_pod'
   tests/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_run_next_exception_requeue[400 BadRequest]: AttributeError: '_patch' object has no attribute 'list_namespaced_pod'
   tests/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_pod_template_file_override_in_executor_config: AttributeError: '_patch' object has no attribute 'list_namespaced_pod'
   tests/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_pending_pod_timeout: AssertionError: Expected 'list_namespaced_pod' to be called once. Called 6 times.
   tests/executors/test_kubernetes_executor.py::TestKubernetesExecutor::test_clear_not_launched_queued_tasks_not_launched: AssertionError: assert 12 == 2
    +  where 12 = <MagicMock name='mock.list_namespaced_pod' id='140049937077712'>.call_count
    +    where <MagicMock name='mock.list_namespaced_pod' id='140049937077712'> = <MagicMock id='140049937318224'>.list_namespaced_pod
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817915035



##########
File path: airflow/providers/google/cloud/example_dags/example_looker.py
##########
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that show how to use various Looker
+operators to submit PDT materialization job and manage it.
+"""
+
+from datetime import datetime
+
+from airflow import models
+from airflow.providers.google.cloud.operators.looker import LookerStartPdtBuildOperator
+from airflow.providers.google.cloud.sensors.looker import LookerCheckPdtBuildSensor
+
+with models.DAG(
+    dag_id='example_gcp_looker',
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+) as dag:
+    # [START cloud_looker_async_start_pdt_sensor]
+    start_pdt_task_async = LookerStartPdtBuildOperator(
+        task_id='looker_pdt_task_async',

Review comment:
       Done! ✅ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1060766528


   Thanks a lot, @potiuk ! Most of the tests are passing now! 👍 
   I see only one failing test:
   ```
   MSSQL2017-latest, Py3.9: Always API Core Other CLI Providers WWW Integration
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r813981880



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(

Review comment:
       Can you check if Looker is available for selection during connection setup?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817035826



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+    ) -> None:
+        super().__init__()
+        self.looker_conn_id = looker_conn_id
+        self.source = self.get_api_source()
+
+    def start_pdt_build(
+        self,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+    ):
+        """
+        Submits a PDT materialization job to Looker.
+
+        :param model: Required. The model of the PDT to start building.
+        :param view: Required. The view of the PDT to start building.
+        :param query_params: Optional. Additional materialization parameters.
+        """
+        self.log.info("Submitting PDT materialization job. Model: '%s', view: '%s'.", model, view)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        looker_ver = sdk.versions().looker_release_version
+        if parse_version(looker_ver) < parse_version("22.2.0"):
+            raise AirflowException(f'This API requires Looker version 22.2+. Found: {looker_ver}.')
+
+        # unpack query_params dict into kwargs (if not None)
+        if query_params:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source, **query_params)
+        else:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source)
+
+        self.log.info("Start PDT build response: '%s'.", resp)
+
+        return resp
+
+    def check_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Gets the PDT materialization job status from Looker.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        self.log.info("Requesting PDT materialization job status. Job id: %s.", materialization_id)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.check_pdt_build(materialization_id=materialization_id)
+
+        self.log.info("Check PDT build response: '%s'.", resp)
+        return resp
+
+    def pdt_build_status(
+        self,
+        materialization_id: str,
+    ) -> Dict:
+        """
+        Gets the PDT materialization job status.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        resp = self.check_pdt_build(materialization_id=materialization_id)
+
+        status_json = resp['resp_text']
+        status_dict = json.loads(status_json)
+
+        self.log.info(
+            "PDT materialization job id: %s. Status: '%s'.", materialization_id, status_dict['status']
+        )
+
+        return status_dict
+
+    def stop_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Starts a PDT materialization job cancellation request.
+
+        :param materialization_id: Required. The materialization id to stop.
+        """
+        self.log.info("Stopping PDT materialization. Job id: %s.", materialization_id)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.stop_pdt_build(materialization_id=materialization_id, source=self.source)
+
+        self.log.info("Stop PDT build response: '%s'.", resp)
+        return resp
+
+    def wait_for_job(
+        self,
+        materialization_id: str,
+        wait_time: int = 10,
+        timeout: Optional[int] = None,
+    ) -> None:
+        """
+        Helper method which polls a PDT materialization job to check if it finishes.
+
+        :param materialization_id: Required. The materialization id to wait for.
+        :param wait_time: Optional. Number of seconds between checks.
+        :param timeout: Optional. How many seconds wait for job to be ready.
+        Used only if ``asynchronous`` is False.
+        """
+        self.log.info('Waiting for PDT materialization job to complete. Job id: %s.', materialization_id)
+
+        status = None
+        start = time.monotonic()
+
+        while status not in (
+            JobStatus.DONE.value,
+            JobStatus.ERROR.value,
+            JobStatus.CANCELLED.value,
+            JobStatus.UNKNOWN.value,
+        ):
+
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(
+                    f"Timeout: PDT materialization job is not ready after {timeout}s. "
+                    f"Job id: {materialization_id}."
+                )
+
+            time.sleep(wait_time)
+
+            status_dict = self.pdt_build_status(materialization_id=materialization_id)
+            status = status_dict['status']
+
+        if status == JobStatus.ERROR.value:
+            msg = status_dict['message']
+            raise AirflowException(
+                f'PDT materialization job failed. Job id: {materialization_id}. Message:\n"{msg}"'
+            )
+        if status == JobStatus.CANCELLED.value:
+            raise AirflowException(f'PDT materialization job was cancelled. Job id: {materialization_id}.')
+        if status == JobStatus.UNKNOWN.value:
+            raise AirflowException(
+                f'PDT materialization job has unknown status. Job id: {materialization_id}.'
+            )
+
+        self.log.info('PDT materialization job completed successfully. Job id: %s.', materialization_id)
+
+    def get_api_source(self):
+        """Returns origin of the API call."""
+
+        # Infer API source based on Airflow version, e.g.:
+        # "2.1.4+composer" -> Cloud Composer
+        # "2.1.4" -> Airflow
+
+        self.log.debug(
+            "Got the following version for connection id: '%s'. Version: '%s'.", self.looker_conn_id, version
+        )
+
+        return 'composer' if 'composer' in version else 'airflow'

Review comment:
       I was hoping to achieve that in an effortless way for the users, that's why we decided to check the Airflow's `version` for additional info.
   
   I'm not sure how an env var like `AIRFLOW_SNOWFLAKE_PARTNER` will be used. Does it mean that a user need to specify it? If so, is there a ways to identify the type of Airflow (standalone/managed) based in a build-in data?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817106938



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+    ) -> None:
+        super().__init__()
+        self.looker_conn_id = looker_conn_id
+        self.source = self.get_api_source()
+
+    def start_pdt_build(
+        self,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+    ):
+        """
+        Submits a PDT materialization job to Looker.
+
+        :param model: Required. The model of the PDT to start building.
+        :param view: Required. The view of the PDT to start building.
+        :param query_params: Optional. Additional materialization parameters.
+        """
+        self.log.info("Submitting PDT materialization job. Model: '%s', view: '%s'.", model, view)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        looker_ver = sdk.versions().looker_release_version
+        if parse_version(looker_ver) < parse_version("22.2.0"):
+            raise AirflowException(f'This API requires Looker version 22.2+. Found: {looker_ver}.')
+
+        # unpack query_params dict into kwargs (if not None)
+        if query_params:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source, **query_params)
+        else:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source)
+
+        self.log.info("Start PDT build response: '%s'.", resp)
+
+        return resp
+
+    def check_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Gets the PDT materialization job status from Looker.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        self.log.info("Requesting PDT materialization job status. Job id: %s.", materialization_id)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.check_pdt_build(materialization_id=materialization_id)
+
+        self.log.info("Check PDT build response: '%s'.", resp)
+        return resp
+
+    def pdt_build_status(
+        self,
+        materialization_id: str,
+    ) -> Dict:
+        """
+        Gets the PDT materialization job status.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        resp = self.check_pdt_build(materialization_id=materialization_id)
+
+        status_json = resp['resp_text']
+        status_dict = json.loads(status_json)
+
+        self.log.info(
+            "PDT materialization job id: %s. Status: '%s'.", materialization_id, status_dict['status']
+        )
+
+        return status_dict
+
+    def stop_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Starts a PDT materialization job cancellation request.
+
+        :param materialization_id: Required. The materialization id to stop.
+        """
+        self.log.info("Stopping PDT materialization. Job id: %s.", materialization_id)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.stop_pdt_build(materialization_id=materialization_id, source=self.source)
+
+        self.log.info("Stop PDT build response: '%s'.", resp)
+        return resp
+
+    def wait_for_job(
+        self,
+        materialization_id: str,
+        wait_time: int = 10,
+        timeout: Optional[int] = None,
+    ) -> None:
+        """
+        Helper method which polls a PDT materialization job to check if it finishes.
+
+        :param materialization_id: Required. The materialization id to wait for.
+        :param wait_time: Optional. Number of seconds between checks.
+        :param timeout: Optional. How many seconds wait for job to be ready.
+        Used only if ``asynchronous`` is False.
+        """
+        self.log.info('Waiting for PDT materialization job to complete. Job id: %s.', materialization_id)
+
+        status = None
+        start = time.monotonic()
+
+        while status not in (
+            JobStatus.DONE.value,
+            JobStatus.ERROR.value,
+            JobStatus.CANCELLED.value,
+            JobStatus.UNKNOWN.value,
+        ):
+
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(
+                    f"Timeout: PDT materialization job is not ready after {timeout}s. "
+                    f"Job id: {materialization_id}."
+                )
+
+            time.sleep(wait_time)
+
+            status_dict = self.pdt_build_status(materialization_id=materialization_id)
+            status = status_dict['status']
+
+        if status == JobStatus.ERROR.value:
+            msg = status_dict['message']
+            raise AirflowException(
+                f'PDT materialization job failed. Job id: {materialization_id}. Message:\n"{msg}"'
+            )
+        if status == JobStatus.CANCELLED.value:
+            raise AirflowException(f'PDT materialization job was cancelled. Job id: {materialization_id}.')
+        if status == JobStatus.UNKNOWN.value:
+            raise AirflowException(
+                f'PDT materialization job has unknown status. Job id: {materialization_id}.'
+            )
+
+        self.log.info('PDT materialization job completed successfully. Job id: %s.', materialization_id)
+
+    def get_api_source(self):
+        """Returns origin of the API call."""
+
+        # Infer API source based on Airflow version, e.g.:
+        # "2.1.4+composer" -> Cloud Composer
+        # "2.1.4" -> Airflow
+
+        self.log.debug(
+            "Got the following version for connection id: '%s'. Version: '%s'.", self.looker_conn_id, version
+        )
+
+        return 'composer' if 'composer' in version else 'airflow'

Review comment:
       >  Does it mean that a user need to specify it? 
   
   Yes. The Cloud Composer team needs to add this variable in their Docker image. 
   ```
   ENV AIRFLOW_GOOGLE_PARTNER=composer
   ```
   
   > If so, is there a ways to identify the type of Airflow (standalone/managed) based in a build-in data?
   
   No, because we are not involved in the development of a managed versions of Airflow and we want to be vendor-vendor neutral. This means that we cannot change the behavior of the code depending on which vendor is running the code. If a vendor wants to make changes to the behavior of the software, this change should only be made in the vendor's environment.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817920959



##########
File path: airflow/providers/google/cloud/operators/looker.py
##########
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains Google Cloud Looker operators."""
+
+from typing import TYPE_CHECKING, Dict, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.looker import LookerHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class LookerStartPdtBuildOperator(BaseOperator):
+    """
+    Submits a PDT materialization job to Looker.
+
+    :param looker_conn_id: Required. The connection ID to use connecting to Looker.
+    :param model: Required. The model of the PDT to start building.
+    :param view: Required. The view of the PDT to start building.
+    :param query_params: Optional. Additional materialization parameters.
+    :param asynchronous: Optional. Flag indicating whether to wait for the job
+    to finish or return immediately.
+    This is useful for submitting long running jobs and
+    waiting on them asynchronously using the LookerCheckPdtBuildSensor
+    :param cancel_on_kill: Optional. Flag which indicates whether cancel the
+    hook's job or not, when on_kill is called.
+    :param wait_time: Optional. Number of seconds between checks for job to be
+    ready. Used only if ``asynchronous`` is False.
+    :param wait_timeout: Optional. How many seconds wait for job to be ready.
+    Used only if ``asynchronous`` is False.
+    """
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+        asynchronous: bool = False,
+        cancel_on_kill: bool = True,
+        wait_time: int = 10,
+        wait_timeout: Optional[int] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.model = model
+        self.view = view
+        self.query_params = query_params
+        self.looker_conn_id = looker_conn_id
+        self.asynchronous = asynchronous
+        self.cancel_on_kill = cancel_on_kill
+        self.wait_time = wait_time
+        self.wait_timeout = wait_timeout
+        self.hook: Optional[LookerHook] = None
+        self.materialization_id: Optional[str] = None
+
+    def execute(self, context: "Context") -> str:
+
+        self.hook = LookerHook(looker_conn_id=self.looker_conn_id)
+
+        resp = self.hook.start_pdt_build(
+            model=self.model,
+            view=self.view,
+            query_params=self.query_params,
+        )
+
+        self.materialization_id = resp.materialization_id
+
+        # materialization_id shouldn't be None, ensure mypy about that
+        assert self.materialization_id is not None

Review comment:
       Done! ✅ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817913461



##########
File path: docs/apache-airflow-providers-google/operators/cloud/looker.rst
##########
@@ -39,29 +39,7 @@ To use these operators, you must do a few things:
 
 Detailed information is available for :doc:`Installation <apache-airflow:installation/index>`.
 
-Communication between Airflow and Looker is done via `Looker API <https://docs.looker.com/reference/api-and-integration/api-reference/v4.0>`_.
-To facilitate the API communication Looker operators use `Looker SDK <https://pypi.org/project/looker-sdk/>`_ as an API client.
-Before calling API, Looker SDK needs to authenticate itself using your Looker API credentials.
-
-* Obtain your Looker API credentials using instructions in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-auth#authentication_with_an_sdk>`_.
-
-* Obtain your Looker API path and port as described in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-getting-started#looker_api_path_and_port>`_.
-
-* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection`
-
-See the following example of a connection setup:
-
-.. code-block::
-
-  Connection Id: your_conn_id  # Passed to operators as ``looker_conn_id`` parameter.
-  Connection Type: HTTP
-  Host: https://your.looker.com  # Base URL for Looker API. Do not include /api/* in the URL.
-  Login: YourClientID  # Looker API client id
-  Password: YourClientSecret  # Looker API client secret
-  Port: 19999  # Port for Looker API. If hosted on GCP, don't specify the port leaving just the host.
-  # Extras are optional parameters in JSON format.
-  Extra: {"verify_ssl": "true",  # Set to false only if testing locally against self-signed certs. Defaults to true if not specified.
-          "timeout": "120"}  # Timeout in seconds for HTTP requests. Defaults to 2 minutes (120) seconds if not specified.
+* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection` and :doc:`apache-airflow-providers-google:connections/gcp_looker`

Review comment:
       I added a link to the new Looker connection page here:
   ```
   :doc:`apache-airflow-providers-google:connections/gcp_looker`
   ```
   @mik-laj, I hope that's the correct way to reference it (I used :doc:`apache-airflow:howto/connection` as a reference).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov edited a comment on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov edited a comment on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1057607646


   @mik-laj, thanks for approving the PR! 
   
   I see the build fails with the following error:
   ```
     #33 30.11 ERROR: Could not find a version that satisfies the requirement looker-sdk>=22.2.0 (from apache-airflow[devel-ci]) (from versions: 0.1.3b1, 0.1.3b2, 0.1.3b3, 0.1.3b4, 0.1.3b20, 7.20.0, 21.0.0, 21.4.0, 21.4.1, 21.6.0, 21.6.1, 21.8.0, 21.8.1, 21.10.0, 21.10.1, 21.12.1, 21.12.2, 21.14.0, 21.16.0, 21.18.0, 21.20.0)
     #33 30.11 ERROR: No matching distribution found for looker-sdk>=22.2.0
   ```
   looker-sdk 22.2 should be available on pypi: https://pypi.org/project/looker-sdk/
   
   I know that for looker sdk we pinned python 3.9 at 3.9.9 version due to some [dependency issues](https://github.com/looker-open-source/sdk-codegen/pull/949/files#diff-1f6e9b98b43ba02030c0cf38755b434fe71bc0ae048346331d0823504d6216f0R35), and looks like the CI here is using python 3.9.10. Wondering if that's why the build fails?
   
   And I see other builds for python 3.8 and 3.7 are cancelled once 3.9 fails. Wondering if we can run them separately?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r818210254



##########
File path: docs/apache-airflow-providers-google/operators/cloud/looker.rst
##########
@@ -0,0 +1,71 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information

Review comment:
       Done! ✅

##########
File path: docs/apache-airflow-providers-google/operators/cloud/looker.rst
##########
@@ -0,0 +1,71 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+Google Cloud Looker Operators
+===============================
+
+Looker is a business intelligence software and big data analytics platform that
+helps you explore, analyze and share real-time business analytics easily.
+
+Looker has a Public API and associated SDK clients in different languages,
+which allow programmatic access to the Looker data platform.
+
+For more information visit `Looker API documentation <https://docs.looker.com/reference/api-and-integration>`_.
+
+Prerequisite Tasks
+------------------
+
+To use these operators, you must do a few things:
+
+* Install API libraries via **pip**.
+
+.. code-block:: bash
+
+  pip install 'apache-airflow[google]'
+
+Detailed information is available for :doc:`Installation <apache-airflow:installation/index>`.
+
+* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection` and :doc:`apache-airflow-providers-google:connections/gcp_looker`
+
+Start a PDT materialization job
+-------------------------------
+
+To submit a PDT materialization job to Looker you need to provide a model and view name.
+
+The job configuration can be submitted in synchronous (blocking) mode by using:
+:class:`~airflow.providers.google.cloud.operators.looker.LookerStartPdtBuildOperator`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_looker.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_looker_start_pdt_build_operator]
+    :end-before: [END how_to_cloud_looker_start_pdt_build_operator]
+
+
+Alternatively, the job configuration can be submitted in asynchronous mode by using:
+:class:`~airflow.providers.google.cloud.operators.looker.LookerStartPdtBuildOperator` and
+:class:`~airflow.providers.google.cloud.sensors.looker.LookerCheckPdtBuildSensor`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_looker.py
+    :language: python
+    :dedent: 4
+    :start-after: [START cloud_looker_async_start_pdt_sensor]
+    :end-before: [END cloud_looker_async_start_pdt_sensor]
+
+There are more arguments to provide in the jobs than the examples show.
+For the complete list of arguments take a look at Looker operator arguments at
+`GitHub <https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/looker.py>`__

Review comment:
       Done! ✅




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1057839876


   @alekseiloginov  - another option would be to remove the looker-sdk>=22.2.0 to allow earlier versions that did not have this restriction, but I am not sure if your changes will ne compatible with your changes. Are there some good reasons you added >=22.2.0 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1060770687


   That was a flaky testf for mssql:    ERROR: for mssqlsetup  Container "130ef72be1ab" is unhealthy. Merging.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1060081798


   I see a dependency conflict for `typing-extensions`
   ```
     #33 79.18 ERROR: Cannot install apache-airflow and apache-airflow[devel-ci]==2.3.0.dev0 because these package versions have conflicting dependencies.
     #33 79.18 
     #33 79.18 The conflict is caused by:
     #33 79.18     apache-airflow[devel-ci] 2.3.0.dev0 depends on typing-extensions<4.0.0 and >=3.7.4
     #33 79.18     mypy 0.910 depends on typing-extensions>=3.7.4
     #33 79.18     apache-beam 2.33.0 depends on typing-extensions<4 and >=3.7.0
     #33 79.18     looker-sdk 22.2.1 depends on typing-extensions>=4.1.1
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1060701799


   :crossed_fingers: !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #20882: [WIP] Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r812167485



##########
File path: airflow/providers/google/cloud/example_dags/example_looker.py
##########
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that show how to use various Looker
+operators to submit PDT materialization job and manage it.
+"""
+
+from datetime import datetime
+
+from airflow import models
+from airflow.providers.google.cloud.operators.looker import LookerStartPdtBuildOperator
+from airflow.providers.google.cloud.sensors.looker import LookerCheckPdtBuildSensor
+
+with models.DAG(
+    dag_id='example_gcp_looker',
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+) as dag:
+    # [START cloud_looker_async_start_pdt_sensor]
+    start_pdt_task_async = LookerStartPdtBuildOperator(
+        task_id='looker_pdt_task_async',

Review comment:
       ```suggestion
           task_id='start_pdt_task_async',
   ```
   That's not necessary but a good practice is to keep `task_id` aligned with the parameter name. It's more consistent 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov removed a comment on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov removed a comment on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1049880526


   /retest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817943962



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,257 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+    ) -> None:
+        super().__init__()
+        self.looker_conn_id = looker_conn_id
+        # source is used to track origin of the requests
+        self.source = f'airflow:{version}'
+
+    def start_pdt_build(
+        self,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+    ):
+        """
+        Submits a PDT materialization job to Looker.
+
+        :param model: Required. The model of the PDT to start building.
+        :param view: Required. The view of the PDT to start building.
+        :param query_params: Optional. Additional materialization parameters.
+        """
+        self.log.info("Submitting PDT materialization job. Model: '%s', view: '%s'.", model, view)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        looker_ver = sdk.versions().looker_release_version
+        if parse_version(looker_ver) < parse_version("22.2.0"):
+            raise AirflowException(f'This API requires Looker version 22.2+. Found: {looker_ver}.')
+
+        # unpack query_params dict into kwargs (if not None)
+        if query_params:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source, **query_params)
+        else:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source)
+
+        self.log.info("Start PDT build response: '%s'.", resp)
+
+        return resp
+
+    def check_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Gets the PDT materialization job status from Looker.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        self.log.info("Requesting PDT materialization job status. Job id: %s.", materialization_id)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.check_pdt_build(materialization_id=materialization_id)
+
+        self.log.info("Check PDT build response: '%s'.", resp)
+        return resp
+
+    def pdt_build_status(
+        self,
+        materialization_id: str,
+    ) -> Dict:
+        """
+        Gets the PDT materialization job status.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        resp = self.check_pdt_build(materialization_id=materialization_id)
+
+        status_json = resp['resp_text']
+        status_dict = json.loads(status_json)
+
+        self.log.info(
+            "PDT materialization job id: %s. Status: '%s'.", materialization_id, status_dict['status']
+        )
+
+        return status_dict
+
+    def stop_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Starts a PDT materialization job cancellation request.
+
+        :param materialization_id: Required. The materialization id to stop.
+        """
+        self.log.info("Stopping PDT materialization. Job id: %s.", materialization_id)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.stop_pdt_build(materialization_id=materialization_id, source=self.source)
+
+        self.log.info("Stop PDT build response: '%s'.", resp)
+        return resp
+
+    def wait_for_job(
+        self,
+        materialization_id: str,
+        wait_time: int = 10,
+        timeout: Optional[int] = None,
+    ) -> None:
+        """
+        Helper method which polls a PDT materialization job to check if it finishes.
+
+        :param materialization_id: Required. The materialization id to wait for.
+        :param wait_time: Optional. Number of seconds between checks.
+        :param timeout: Optional. How many seconds wait for job to be ready.
+        Used only if ``asynchronous`` is False.
+        """
+        self.log.info('Waiting for PDT materialization job to complete. Job id: %s.', materialization_id)
+
+        status = None
+        start = time.monotonic()
+
+        while status not in (
+            JobStatus.DONE.value,
+            JobStatus.ERROR.value,
+            JobStatus.CANCELLED.value,
+            JobStatus.UNKNOWN.value,
+        ):
+
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(
+                    f"Timeout: PDT materialization job is not ready after {timeout}s. "
+                    f"Job id: {materialization_id}."
+                )
+
+            time.sleep(wait_time)
+
+            status_dict = self.pdt_build_status(materialization_id=materialization_id)
+            status = status_dict['status']
+
+        if status == JobStatus.ERROR.value:
+            msg = status_dict['message']
+            raise AirflowException(
+                f'PDT materialization job failed. Job id: {materialization_id}. Message:\n"{msg}"'
+            )
+        if status == JobStatus.CANCELLED.value:
+            raise AirflowException(f'PDT materialization job was cancelled. Job id: {materialization_id}.')
+        if status == JobStatus.UNKNOWN.value:
+            raise AirflowException(
+                f'PDT materialization job has unknown status. Job id: {materialization_id}.'
+            )
+
+        self.log.info('PDT materialization job completed successfully. Job id: %s.', materialization_id)
+
+    def get_looker_sdk(self):
+        """Returns Looker SDK client for Looker API 4.0."""
+
+        conn = self.get_connection(self.looker_conn_id)
+        settings = LookerApiSettings(conn)
+
+        transport = requests_transport.RequestsTransport.configure(settings)
+        return methods40.Looker40SDK(
+            auth_session.AuthSession(settings, transport, serialize.deserialize40, "4.0"),
+            serialize.deserialize40,
+            serialize.serialize,
+            transport,
+            "4.0",
+        )
+

Review comment:
       What do you think about adding the `test_connection` method to make it possible to check the connection configuration from the UI?
   https://github.com/apache/airflow/blob/602abe8394fafe7de54df7e73af56de848cdf617/airflow/providers/airbyte/hooks/airbyte.py#L110-L127




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r818077329



##########
File path: docs/apache-airflow-providers-google/operators/cloud/looker.rst
##########
@@ -0,0 +1,71 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+Google Cloud Looker Operators
+===============================
+
+Looker is a business intelligence software and big data analytics platform that
+helps you explore, analyze and share real-time business analytics easily.
+
+Looker has a Public API and associated SDK clients in different languages,
+which allow programmatic access to the Looker data platform.
+
+For more information visit `Looker API documentation <https://docs.looker.com/reference/api-and-integration>`_.
+
+Prerequisite Tasks
+------------------
+
+To use these operators, you must do a few things:
+
+* Install API libraries via **pip**.
+
+.. code-block:: bash
+
+  pip install 'apache-airflow[google]'
+
+Detailed information is available for :doc:`Installation <apache-airflow:installation/index>`.
+
+* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection` and :doc:`apache-airflow-providers-google:connections/gcp_looker`
+
+Start a PDT materialization job
+-------------------------------
+
+To submit a PDT materialization job to Looker you need to provide a model and view name.
+
+The job configuration can be submitted in synchronous (blocking) mode by using:
+:class:`~airflow.providers.google.cloud.operators.looker.LookerStartPdtBuildOperator`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_looker.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_looker_start_pdt_build_operator]
+    :end-before: [END how_to_cloud_looker_start_pdt_build_operator]
+
+
+Alternatively, the job configuration can be submitted in asynchronous mode by using:
+:class:`~airflow.providers.google.cloud.operators.looker.LookerStartPdtBuildOperator` and
+:class:`~airflow.providers.google.cloud.sensors.looker.LookerCheckPdtBuildSensor`.
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_looker.py
+    :language: python
+    :dedent: 4
+    :start-after: [START cloud_looker_async_start_pdt_sensor]
+    :end-before: [END cloud_looker_async_start_pdt_sensor]
+
+There are more arguments to provide in the jobs than the examples show.
+For the complete list of arguments take a look at Looker operator arguments at
+`GitHub <https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/looker.py>`__

Review comment:
       We shouldn't refer to Github as we have generated reference documentation for each  operator.
   https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/vision/index.html#airflow.providers.google.cloud.operators.vision.CloudVisionGetProductSetOperator
   For example: To generate link to `airflow.providers.google.cloud.sensors.looker.LookerCheckPdtBuildSensor` class, you should. use the following syntax:
   ```
   :class:`airflow.providers.google.cloud.sensors.looker.LookerCheckPdtBuildSensor`
   ```
   Here is docs: https://www.sphinx-doc.org/en/master/usage/restructuredtext/domains.html#python-roles




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1057607646


   @mik-laj, thanks for approving the PR! 
   
   I see the build fails with the following error:
   ```
     #33 30.11 ERROR: Could not find a version that satisfies the requirement looker-sdk>=22.2.0 (from apache-airflow[devel-ci]) (from versions: 0.1.3b1, 0.1.3b2, 0.1.3b3, 0.1.3b4, 0.1.3b20, 7.20.0, 21.0.0, 21.4.0, 21.4.1, 21.6.0, 21.6.1, 21.8.0, 21.8.1, 21.10.0, 21.10.1, 21.12.1, 21.12.2, 21.14.0, 21.16.0, 21.18.0, 21.20.0)
     #33 30.11 ERROR: No matching distribution found for looker-sdk>=22.2.0
   ```
   looker-sdk 22.2 should be available on pypi: https://pypi.org/project/looker-sdk/
   
   I know that we pinned python 3.9 at 3.9.9 version due to some [dependency issues](https://github.com/looker-open-source/sdk-codegen/pull/949/files#diff-1f6e9b98b43ba02030c0cf38755b434fe71bc0ae048346331d0823504d6216f0R35). Wondering if that's why the build fails?
   
   And I see other builds for python 3.8 and 3.7 are cancelled once 3.9 fails. Wondering if we can run them separately?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #20882: [WIP] Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1013350915


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1060533688


   Right. Constraints refreshed but indeed typing-extensions are too limiting. Attempting to see what the problems are and fix it in https://github.com/apache/airflow/pull/22046


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r819734146



##########
File path: docs/apache-airflow-providers-google/operators/cloud/looker.rst
##########
@@ -39,29 +39,7 @@ To use these operators, you must do a few things:
 
 Detailed information is available for :doc:`Installation <apache-airflow:installation/index>`.
 
-Communication between Airflow and Looker is done via `Looker API <https://docs.looker.com/reference/api-and-integration/api-reference/v4.0>`_.
-To facilitate the API communication Looker operators use `Looker SDK <https://pypi.org/project/looker-sdk/>`_ as an API client.
-Before calling API, Looker SDK needs to authenticate itself using your Looker API credentials.
-
-* Obtain your Looker API credentials using instructions in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-auth#authentication_with_an_sdk>`_.
-
-* Obtain your Looker API path and port as described in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-getting-started#looker_api_path_and_port>`_.
-
-* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection`
-
-See the following example of a connection setup:
-
-.. code-block::
-
-  Connection Id: your_conn_id  # Passed to operators as ``looker_conn_id`` parameter.
-  Connection Type: HTTP
-  Host: https://your.looker.com  # Base URL for Looker API. Do not include /api/* in the URL.
-  Login: YourClientID  # Looker API client id
-  Password: YourClientSecret  # Looker API client secret
-  Port: 19999  # Port for Looker API. If hosted on GCP, don't specify the port leaving just the host.
-  # Extras are optional parameters in JSON format.
-  Extra: {"verify_ssl": "true",  # Set to false only if testing locally against self-signed certs. Defaults to true if not specified.
-          "timeout": "120"}  # Timeout in seconds for HTTP requests. Defaults to 2 minutes (120) seconds if not specified.
+* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection` and :doc:`apache-airflow-providers-google:connections/gcp_looker`

Review comment:
       Hmm, I added
   ```
   :doc:`connections/gcp_looker`
   ```
   but I'm still getting
   ```
   unknown document: connections/gcp_looker
   ```
   https://github.com/apache/airflow/runs/5424940888?check_suite_focus=true




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk removed a comment on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk removed a comment on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1060791706


   Preparing the release now. Good stuff. I just checked that Last Python 3.10 obstacle is removed and we **just** need to release all providers now with Python 3.10 support to be able to add 3.10 :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on pull request #20882: [WIP] Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1020329721


   > Can you please rebase to latest main ?
   
   Thanks for reminder @potiuk, rebased!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lwyszomi commented on a change in pull request #20882: [WIP] Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
lwyszomi commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r797593303



##########
File path: airflow/providers/google/cloud/operators/looker.py
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains Google Cloud Looker operators."""
+
+from typing import Optional, Dict
+
+from airflow.models.baseoperator import BaseOperator
+from airflow.providers.google.cloud.hooks.looker import LookerHook
+
+
+class LookerStartPdtBuildOperator(BaseOperator):
+    """
+    Submits a PDT materialization job to Looker.
+
+    :param looker_conn_id: Required. The connection ID to use connecting to Looker.
+    :type looker_conn_id: str
+    :param model: Required. The model of the PDT to start building.
+    :type model: str
+    :param view: Required. The view of the PDT to start building.
+    :type view: str
+    :param query_params: Optional. Additional materialization parameters.
+    :type query_params: Dict
+    :param asynchronous: Optional. Flag indicating whether to wait for the job to finish or return immediately.
+        This is useful for submitting long running jobs and
+        waiting on them asynchronously using the LookerCheckPdtBuildSensor
+    :type asynchronous: bool
+    :param cancel_on_kill: Optional. Flag which indicates whether cancel the hook's job or not, when on_kill is called.
+    :type cancel_on_kill: bool
+    :param wait_time: Optional. Number of seconds between checks for job to be ready. Used only if ``asynchronous`` is False.
+    :type wait_time: int
+    :param wait_timeout: Optional. How many seconds wait for job to be ready. Used only if ``asynchronous`` is False.
+    :type wait_timeout: int
+    """
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+        asynchronous: bool = False,
+        cancel_on_kill: bool = True,
+        wait_time: int = 10,
+        wait_timeout: Optional[int] = None,
+        **kwargs) -> None:
+        super().__init__(**kwargs)
+        self.model = model
+        self.view = view
+        self.query_params = query_params
+        self.looker_conn_id = looker_conn_id
+        self.asynchronous = asynchronous
+        self.cancel_on_kill = cancel_on_kill
+        self.wait_time = wait_time
+        self.wait_timeout = wait_timeout
+        self.hook: Optional[LookerHook] = None
+        self.materialization_id: Optional[str] = None
+
+    def execute(self, context):

Review comment:
       As of Airflow 2.2.3 there is a specific `Context` object available for typing. To make sute that operator doesn't have any implicit Airflow version dependency you should use `TYPE_CHECKING`:
   
   Here a example:
   ```python
   if TYPE_CHECKING:
       from airflow.utils.context import Context
   ...
   
   def execute(self, context: "Context"): ...
   ```
   
   This can be also applied to the Sensor.

##########
File path: airflow/providers/google/cloud/operators/looker.py
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains Google Cloud Looker operators."""
+
+from typing import Optional, Dict
+
+from airflow.models.baseoperator import BaseOperator
+from airflow.providers.google.cloud.hooks.looker import LookerHook
+
+
+class LookerStartPdtBuildOperator(BaseOperator):

Review comment:
       We are in the process to add Links to the GCP resources for most of the operators. Maybe this a good time to add this also to the Looker operators.
   
   Example how you can do this you can find in the Dataproc Metastore: 
   https://github.com/apache/airflow/pull/21267




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: [WIP] Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r797633603



##########
File path: airflow/providers/google/cloud/operators/looker.py
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains Google Cloud Looker operators."""
+
+from typing import Optional, Dict
+
+from airflow.models.baseoperator import BaseOperator
+from airflow.providers.google.cloud.hooks.looker import LookerHook
+
+
+class LookerStartPdtBuildOperator(BaseOperator):

Review comment:
       Got it! Is it the same as the [extra links](https://airflow.apache.org/docs/apache-airflow-providers/core-extensions/extra-links.html)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #20882:
URL: https://github.com/apache/airflow/pull/20882


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1059818050


   Yep. Thanks @mik-laj  Those are due to some upgrades in deps. I am looking at it separately (agreed that with @alekseiloginov  :)). I want to upgrade those deps in separate PR as I need those anyway to add Python 3.10 and ARM/M1 support.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1060069526


   @alekseiloginov  - You should be able to rebase now !  Please let me know when you did :D


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #20882: [WIP] Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r812166484



##########
File path: airflow/providers/google/cloud/example_dags/example_looker.py
##########
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that show how to use various Looker
+operators to submit PDT materialization job and manage it.
+"""
+
+from datetime import datetime
+
+from airflow import models
+from airflow.providers.google.cloud.operators.looker import LookerStartPdtBuildOperator
+from airflow.providers.google.cloud.sensors.looker import LookerCheckPdtBuildSensor
+
+with models.DAG(
+    dag_id='example_gcp_looker',
+    schedule_interval='@once',

Review comment:
       ```suggestion
       schedule_interval=None,
   ```
   In this case if someone enabled example DAGs this DAG won't get executed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov edited a comment on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov edited a comment on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1059180977


   Thanks to @joeldodge79, we [unpinned](https://github.com/looker-open-source/sdk-codegen/pull/1015) python 3.9.9 and released new [looker sdk 2.2.1](https://pypi.org/project/looker-sdk/) version for it.
   With this, I will try to rerun CI again now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov edited a comment on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov edited a comment on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1059202623


   CI is failing with dependency conflict between `apache-airflow` and `apache-airflow[devel-ci]`:
   ```
     #32 96.41 ERROR: Cannot install apache-airflow and apache-airflow[devel-ci]==2.3.0.dev0 because these package versions have conflicting dependencies.
     #32 96.41 
     #32 96.41 The conflict is caused by:
     #32 96.41     apache-airflow[devel-ci] 2.3.0.dev0 depends on cloudpickle<1.5.0 and >=1.4.1
     #32 96.41     apache-beam 2.36.0 depends on cloudpickle<3 and >=2.0.0
   ```
   
   https://github.com/apache/airflow/runs/5423270889?check_suite_focus=true


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r813974499



##########
File path: airflow/providers/google/cloud/operators/looker.py
##########
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains Google Cloud Looker operators."""
+
+from typing import TYPE_CHECKING, Dict, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.looker import LookerHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class LookerStartPdtBuildOperator(BaseOperator):
+    """
+    Submits a PDT materialization job to Looker.
+
+    :param looker_conn_id: Required. The connection ID to use connecting to Looker.
+    :param model: Required. The model of the PDT to start building.
+    :param view: Required. The view of the PDT to start building.
+    :param query_params: Optional. Additional materialization parameters.
+    :param asynchronous: Optional. Flag indicating whether to wait for the job
+    to finish or return immediately.
+    This is useful for submitting long running jobs and
+    waiting on them asynchronously using the LookerCheckPdtBuildSensor
+    :param cancel_on_kill: Optional. Flag which indicates whether cancel the
+    hook's job or not, when on_kill is called.
+    :param wait_time: Optional. Number of seconds between checks for job to be
+    ready. Used only if ``asynchronous`` is False.
+    :param wait_timeout: Optional. How many seconds wait for job to be ready.
+    Used only if ``asynchronous`` is False.
+    """
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+        asynchronous: bool = False,
+        cancel_on_kill: bool = True,
+        wait_time: int = 10,
+        wait_timeout: Optional[int] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.model = model
+        self.view = view
+        self.query_params = query_params
+        self.looker_conn_id = looker_conn_id
+        self.asynchronous = asynchronous
+        self.cancel_on_kill = cancel_on_kill
+        self.wait_time = wait_time
+        self.wait_timeout = wait_timeout
+        self.hook: Optional[LookerHook] = None
+        self.materialization_id: Optional[str] = None
+
+    def execute(self, context: "Context") -> str:
+
+        self.hook = LookerHook(looker_conn_id=self.looker_conn_id)
+
+        resp = self.hook.start_pdt_build(
+            model=self.model,
+            view=self.view,
+            query_params=self.query_params,
+        )
+
+        self.materialization_id = resp.materialization_id
+
+        # materialization_id shouldn't be None, ensure mypy about that
+        assert self.materialization_id is not None

Review comment:
       Please cont use asserts outside tests
   https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#don-t-use-asserts-outside-tests




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #20882: [WIP] Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r812168693



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+# TODO: Temporary import for Looker API response stub, remove once looker sdk 22.2 released
+import attr
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+# TODO: Temporary import for Looker API response stub, remove once looker sdk 22.2 released
+from looker_sdk.rtl.model import Model
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+# TODO: uncomment once looker sdk 22.2 released
+# from looker_sdk.sdk.api40.models import MaterializePDT
+
+
+@attr.s(auto_attribs=True, init=False)
+class MaterializePDT(Model):
+    """
+    TODO: Temporary API response stub. Use Looker API class once looker sdk 22.2 released.

Review comment:
       Do we have some release date? Maybe we can wait if it's soon?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817948115



##########
File path: docs/apache-airflow-providers-google/operators/cloud/looker.rst
##########
@@ -39,29 +39,7 @@ To use these operators, you must do a few things:
 
 Detailed information is available for :doc:`Installation <apache-airflow:installation/index>`.
 
-Communication between Airflow and Looker is done via `Looker API <https://docs.looker.com/reference/api-and-integration/api-reference/v4.0>`_.
-To facilitate the API communication Looker operators use `Looker SDK <https://pypi.org/project/looker-sdk/>`_ as an API client.
-Before calling API, Looker SDK needs to authenticate itself using your Looker API credentials.
-
-* Obtain your Looker API credentials using instructions in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-auth#authentication_with_an_sdk>`_.
-
-* Obtain your Looker API path and port as described in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-getting-started#looker_api_path_and_port>`_.
-
-* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection`
-
-See the following example of a connection setup:
-
-.. code-block::
-
-  Connection Id: your_conn_id  # Passed to operators as ``looker_conn_id`` parameter.
-  Connection Type: HTTP
-  Host: https://your.looker.com  # Base URL for Looker API. Do not include /api/* in the URL.
-  Login: YourClientID  # Looker API client id
-  Password: YourClientSecret  # Looker API client secret
-  Port: 19999  # Port for Looker API. If hosted on GCP, don't specify the port leaving just the host.
-  # Extras are optional parameters in JSON format.
-  Extra: {"verify_ssl": "true",  # Set to false only if testing locally against self-signed certs. Defaults to true if not specified.
-          "timeout": "120"}  # Timeout in seconds for HTTP requests. Defaults to 2 minutes (120) seconds if not specified.
+* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection` and :doc:`apache-airflow-providers-google:connections/gcp_looker`

Review comment:
       Documentation in the same directory should not use the package name.
   These reference are VALID for files in `/docs/apache-airflow-providers-google/` directory
   ```
   :doc:`apache-airflow:howto/connection`
   :doc:`connections/gcp_looker`
   ```
   This reference are INVALID for files in `/docs/apache-airflow-providers-google/` directory:
   ```
   :doc:`apache-airflow-providers-google:connections/gcp_looker`
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817032153



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+    ) -> None:
+        super().__init__()
+        self.looker_conn_id = looker_conn_id
+        self.source = self.get_api_source()
+
+    def start_pdt_build(
+        self,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+    ):
+        """
+        Submits a PDT materialization job to Looker.
+
+        :param model: Required. The model of the PDT to start building.
+        :param view: Required. The view of the PDT to start building.
+        :param query_params: Optional. Additional materialization parameters.
+        """
+        self.log.info("Submitting PDT materialization job. Model: '%s', view: '%s'.", model, view)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        looker_ver = sdk.versions().looker_release_version
+        if parse_version(looker_ver) < parse_version("22.2.0"):
+            raise AirflowException(f'This API requires Looker version 22.2+. Found: {looker_ver}.')
+
+        # unpack query_params dict into kwargs (if not None)
+        if query_params:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source, **query_params)
+        else:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source)
+
+        self.log.info("Start PDT build response: '%s'.", resp)
+
+        return resp
+
+    def check_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Gets the PDT materialization job status from Looker.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        self.log.info("Requesting PDT materialization job status. Job id: %s.", materialization_id)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.check_pdt_build(materialization_id=materialization_id)
+
+        self.log.info("Check PDT build response: '%s'.", resp)
+        return resp
+
+    def pdt_build_status(
+        self,
+        materialization_id: str,
+    ) -> Dict:
+        """
+        Gets the PDT materialization job status.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        resp = self.check_pdt_build(materialization_id=materialization_id)
+
+        status_json = resp['resp_text']
+        status_dict = json.loads(status_json)
+
+        self.log.info(
+            "PDT materialization job id: %s. Status: '%s'.", materialization_id, status_dict['status']
+        )
+
+        return status_dict
+
+    def stop_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Starts a PDT materialization job cancellation request.
+
+        :param materialization_id: Required. The materialization id to stop.
+        """
+        self.log.info("Stopping PDT materialization. Job id: %s.", materialization_id)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.stop_pdt_build(materialization_id=materialization_id, source=self.source)
+
+        self.log.info("Stop PDT build response: '%s'.", resp)
+        return resp
+
+    def wait_for_job(
+        self,
+        materialization_id: str,
+        wait_time: int = 10,
+        timeout: Optional[int] = None,
+    ) -> None:
+        """
+        Helper method which polls a PDT materialization job to check if it finishes.
+
+        :param materialization_id: Required. The materialization id to wait for.
+        :param wait_time: Optional. Number of seconds between checks.
+        :param timeout: Optional. How many seconds wait for job to be ready.
+        Used only if ``asynchronous`` is False.
+        """
+        self.log.info('Waiting for PDT materialization job to complete. Job id: %s.', materialization_id)
+
+        status = None
+        start = time.monotonic()
+
+        while status not in (
+            JobStatus.DONE.value,
+            JobStatus.ERROR.value,
+            JobStatus.CANCELLED.value,
+            JobStatus.UNKNOWN.value,
+        ):
+
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(
+                    f"Timeout: PDT materialization job is not ready after {timeout}s. "
+                    f"Job id: {materialization_id}."
+                )
+
+            time.sleep(wait_time)
+
+            status_dict = self.pdt_build_status(materialization_id=materialization_id)
+            status = status_dict['status']
+
+        if status == JobStatus.ERROR.value:
+            msg = status_dict['message']
+            raise AirflowException(
+                f'PDT materialization job failed. Job id: {materialization_id}. Message:\n"{msg}"'
+            )
+        if status == JobStatus.CANCELLED.value:
+            raise AirflowException(f'PDT materialization job was cancelled. Job id: {materialization_id}.')
+        if status == JobStatus.UNKNOWN.value:
+            raise AirflowException(
+                f'PDT materialization job has unknown status. Job id: {materialization_id}.'
+            )
+
+        self.log.info('PDT materialization job completed successfully. Job id: %s.', materialization_id)
+
+    def get_api_source(self):
+        """Returns origin of the API call."""
+
+        # Infer API source based on Airflow version, e.g.:
+        # "2.1.4+composer" -> Cloud Composer
+        # "2.1.4" -> Airflow
+
+        self.log.debug(
+            "Got the following version for connection id: '%s'. Version: '%s'.", self.looker_conn_id, version
+        )
+
+        return 'composer' if 'composer' in version else 'airflow'

Review comment:
       We'd like to track adoption of this integration. And we'd like to differentiate whether Looker API was called from a standalone Airflow or a managed one. Especially we are interested in Cloud Composer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1060085568


   Then. I think you will need to remove`typing-extensions` limit from setup.py. It causes Kubernetes tests failure if it is removed, but this would have to be solved. 
   
   While the dask limitation was clearly a dask problem, I think solving the `typing-extensions` problem would be something that maybe you can help with since you would like to upgrade it :).
   
   Just removing the limit  should trigger the update and you might be able to see the problems


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1060089716


   Ah no I looked clower. It seems that there is still some other reason to hold Beam from upgrading to latest version but what it is I do not know yet - I will look tomorrow. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1060120061


   Ah. likely constraints not refreshed yet :). Let's see tomorrow morning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r819993056



##########
File path: docs/apache-airflow-providers-google/operators/cloud/looker.rst
##########
@@ -39,29 +39,7 @@ To use these operators, you must do a few things:
 
 Detailed information is available for :doc:`Installation <apache-airflow:installation/index>`.
 
-Communication between Airflow and Looker is done via `Looker API <https://docs.looker.com/reference/api-and-integration/api-reference/v4.0>`_.
-To facilitate the API communication Looker operators use `Looker SDK <https://pypi.org/project/looker-sdk/>`_ as an API client.
-Before calling API, Looker SDK needs to authenticate itself using your Looker API credentials.
-
-* Obtain your Looker API credentials using instructions in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-auth#authentication_with_an_sdk>`_.
-
-* Obtain your Looker API path and port as described in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-getting-started#looker_api_path_and_port>`_.
-
-* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection`
-
-See the following example of a connection setup:
-
-.. code-block::
-
-  Connection Id: your_conn_id  # Passed to operators as ``looker_conn_id`` parameter.
-  Connection Type: HTTP
-  Host: https://your.looker.com  # Base URL for Looker API. Do not include /api/* in the URL.
-  Login: YourClientID  # Looker API client id
-  Password: YourClientSecret  # Looker API client secret
-  Port: 19999  # Port for Looker API. If hosted on GCP, don't specify the port leaving just the host.
-  # Extras are optional parameters in JSON format.
-  Extra: {"verify_ssl": "true",  # Set to false only if testing locally against self-signed certs. Defaults to true if not specified.
-          "timeout": "120"}  # Timeout in seconds for HTTP requests. Defaults to 2 minutes (120) seconds if not specified.
+* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection` and :doc:`apache-airflow-providers-google:connections/gcp_looker`

Review comment:
       Okay, I got it, it just needed to be an absolute path
   ```
   :doc:`/connections/gcp_looker`
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov edited a comment on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov edited a comment on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1057607646


   @mik-laj, thanks for approving the PR! 
   
   I see the build fails with the following error:
   ```
     #33 30.11 ERROR: Could not find a version that satisfies the requirement looker-sdk>=22.2.0 (from apache-airflow[devel-ci]) (from versions: 0.1.3b1, 0.1.3b2, 0.1.3b3, 0.1.3b4, 0.1.3b20, 7.20.0, 21.0.0, 21.4.0, 21.4.1, 21.6.0, 21.6.1, 21.8.0, 21.8.1, 21.10.0, 21.10.1, 21.12.1, 21.12.2, 21.14.0, 21.16.0, 21.18.0, 21.20.0)
     #33 30.11 ERROR: No matching distribution found for looker-sdk>=22.2.0
   ```
   looker-sdk 22.2 should be available on pypi: https://pypi.org/project/looker-sdk/
   
   I know that we pinned python 3.9 at 3.9.9 version due to some [dependency issues](https://github.com/looker-open-source/sdk-codegen/pull/949/files#diff-1f6e9b98b43ba02030c0cf38755b434fe71bc0ae048346331d0823504d6216f0R35), and looks like the CI here is using python 3.9.10. Wondering if that's why the build fails?
   
   And I see other builds for python 3.8 and 3.7 are cancelled once 3.9 fails. Wondering if we can run them separately?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed pull request #20882: [WIP] Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #20882:
URL: https://github.com/apache/airflow/pull/20882


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1060772436


   🎉 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #20882: [WIP] Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r812168047



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+# TODO: Temporary import for Looker API response stub, remove once looker sdk 22.2 released

Review comment:
       Can you please create a issue for this and link it comment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817914904



##########
File path: airflow/providers/google/cloud/example_dags/example_looker.py
##########
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that show how to use various Looker
+operators to submit PDT materialization job and manage it.
+"""
+
+from datetime import datetime
+
+from airflow import models
+from airflow.providers.google.cloud.operators.looker import LookerStartPdtBuildOperator
+from airflow.providers.google.cloud.sensors.looker import LookerCheckPdtBuildSensor
+
+with models.DAG(
+    dag_id='example_gcp_looker',
+    schedule_interval='@once',

Review comment:
       Done! ✅ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r818074172



##########
File path: docs/apache-airflow-providers-google/operators/cloud/looker.rst
##########
@@ -0,0 +1,71 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information

Review comment:
       The license header is invalid. Several tabs are missing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1057540061


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov edited a comment on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov edited a comment on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1058241182


   As @potiuk kindly offered to hold the providers release for one day, we will try to resolve the dependency issue and unpin python 3.9.9 by tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r818187023



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(

Review comment:
       I talked to @potiuk and he said that the next google provider release is happening tomorrow. I will try to implement it by then, but we would really like to launch it with this release to avoid waiting for another month for the next one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1057835236


   Honestly, I can't believe Looker developers did it. It's horrible and extremely bad practice. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817110344



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+    ) -> None:
+        super().__init__()
+        self.looker_conn_id = looker_conn_id
+        self.source = self.get_api_source()
+
+    def start_pdt_build(
+        self,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+    ):
+        """
+        Submits a PDT materialization job to Looker.
+
+        :param model: Required. The model of the PDT to start building.
+        :param view: Required. The view of the PDT to start building.
+        :param query_params: Optional. Additional materialization parameters.
+        """
+        self.log.info("Submitting PDT materialization job. Model: '%s', view: '%s'.", model, view)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        looker_ver = sdk.versions().looker_release_version
+        if parse_version(looker_ver) < parse_version("22.2.0"):
+            raise AirflowException(f'This API requires Looker version 22.2+. Found: {looker_ver}.')
+
+        # unpack query_params dict into kwargs (if not None)
+        if query_params:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source, **query_params)
+        else:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source)
+
+        self.log.info("Start PDT build response: '%s'.", resp)
+
+        return resp
+
+    def check_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Gets the PDT materialization job status from Looker.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        self.log.info("Requesting PDT materialization job status. Job id: %s.", materialization_id)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.check_pdt_build(materialization_id=materialization_id)
+
+        self.log.info("Check PDT build response: '%s'.", resp)
+        return resp
+
+    def pdt_build_status(
+        self,
+        materialization_id: str,
+    ) -> Dict:
+        """
+        Gets the PDT materialization job status.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        resp = self.check_pdt_build(materialization_id=materialization_id)
+
+        status_json = resp['resp_text']
+        status_dict = json.loads(status_json)
+
+        self.log.info(
+            "PDT materialization job id: %s. Status: '%s'.", materialization_id, status_dict['status']
+        )
+
+        return status_dict
+
+    def stop_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Starts a PDT materialization job cancellation request.
+
+        :param materialization_id: Required. The materialization id to stop.
+        """
+        self.log.info("Stopping PDT materialization. Job id: %s.", materialization_id)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.stop_pdt_build(materialization_id=materialization_id, source=self.source)
+
+        self.log.info("Stop PDT build response: '%s'.", resp)
+        return resp
+
+    def wait_for_job(
+        self,
+        materialization_id: str,
+        wait_time: int = 10,
+        timeout: Optional[int] = None,
+    ) -> None:
+        """
+        Helper method which polls a PDT materialization job to check if it finishes.
+
+        :param materialization_id: Required. The materialization id to wait for.
+        :param wait_time: Optional. Number of seconds between checks.
+        :param timeout: Optional. How many seconds wait for job to be ready.
+        Used only if ``asynchronous`` is False.
+        """
+        self.log.info('Waiting for PDT materialization job to complete. Job id: %s.', materialization_id)
+
+        status = None
+        start = time.monotonic()
+
+        while status not in (
+            JobStatus.DONE.value,
+            JobStatus.ERROR.value,
+            JobStatus.CANCELLED.value,
+            JobStatus.UNKNOWN.value,
+        ):
+
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(
+                    f"Timeout: PDT materialization job is not ready after {timeout}s. "
+                    f"Job id: {materialization_id}."
+                )
+
+            time.sleep(wait_time)
+
+            status_dict = self.pdt_build_status(materialization_id=materialization_id)
+            status = status_dict['status']
+
+        if status == JobStatus.ERROR.value:
+            msg = status_dict['message']
+            raise AirflowException(
+                f'PDT materialization job failed. Job id: {materialization_id}. Message:\n"{msg}"'
+            )
+        if status == JobStatus.CANCELLED.value:
+            raise AirflowException(f'PDT materialization job was cancelled. Job id: {materialization_id}.')
+        if status == JobStatus.UNKNOWN.value:
+            raise AirflowException(
+                f'PDT materialization job has unknown status. Job id: {materialization_id}.'
+            )
+
+        self.log.info('PDT materialization job completed successfully. Job id: %s.', materialization_id)
+
+    def get_api_source(self):
+        """Returns origin of the API call."""
+
+        # Infer API source based on Airflow version, e.g.:
+        # "2.1.4+composer" -> Cloud Composer
+        # "2.1.4" -> Airflow
+
+        self.log.debug(
+            "Got the following version for connection id: '%s'. Version: '%s'.", self.looker_conn_id, version
+        )
+
+        return 'composer' if 'composer' in version else 'airflow'

Review comment:
       Got it, I think I'll stick with tracking the `version` string itself then if that's okay? I don't want to introduce additional overhead right now for this small tracking feature.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817037282



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+    ) -> None:
+        super().__init__()
+        self.looker_conn_id = looker_conn_id
+        self.source = self.get_api_source()
+
+    def start_pdt_build(
+        self,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+    ):
+        """
+        Submits a PDT materialization job to Looker.
+
+        :param model: Required. The model of the PDT to start building.
+        :param view: Required. The view of the PDT to start building.
+        :param query_params: Optional. Additional materialization parameters.
+        """
+        self.log.info("Submitting PDT materialization job. Model: '%s', view: '%s'.", model, view)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        looker_ver = sdk.versions().looker_release_version
+        if parse_version(looker_ver) < parse_version("22.2.0"):
+            raise AirflowException(f'This API requires Looker version 22.2+. Found: {looker_ver}.')
+
+        # unpack query_params dict into kwargs (if not None)
+        if query_params:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source, **query_params)
+        else:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source)
+
+        self.log.info("Start PDT build response: '%s'.", resp)
+
+        return resp
+
+    def check_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Gets the PDT materialization job status from Looker.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        self.log.info("Requesting PDT materialization job status. Job id: %s.", materialization_id)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.check_pdt_build(materialization_id=materialization_id)
+
+        self.log.info("Check PDT build response: '%s'.", resp)
+        return resp
+
+    def pdt_build_status(
+        self,
+        materialization_id: str,
+    ) -> Dict:
+        """
+        Gets the PDT materialization job status.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        resp = self.check_pdt_build(materialization_id=materialization_id)
+
+        status_json = resp['resp_text']
+        status_dict = json.loads(status_json)
+
+        self.log.info(
+            "PDT materialization job id: %s. Status: '%s'.", materialization_id, status_dict['status']
+        )
+
+        return status_dict
+
+    def stop_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Starts a PDT materialization job cancellation request.
+
+        :param materialization_id: Required. The materialization id to stop.
+        """
+        self.log.info("Stopping PDT materialization. Job id: %s.", materialization_id)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.stop_pdt_build(materialization_id=materialization_id, source=self.source)
+
+        self.log.info("Stop PDT build response: '%s'.", resp)
+        return resp
+
+    def wait_for_job(
+        self,
+        materialization_id: str,
+        wait_time: int = 10,
+        timeout: Optional[int] = None,
+    ) -> None:
+        """
+        Helper method which polls a PDT materialization job to check if it finishes.
+
+        :param materialization_id: Required. The materialization id to wait for.
+        :param wait_time: Optional. Number of seconds between checks.
+        :param timeout: Optional. How many seconds wait for job to be ready.
+        Used only if ``asynchronous`` is False.
+        """
+        self.log.info('Waiting for PDT materialization job to complete. Job id: %s.', materialization_id)
+
+        status = None
+        start = time.monotonic()
+
+        while status not in (
+            JobStatus.DONE.value,
+            JobStatus.ERROR.value,
+            JobStatus.CANCELLED.value,
+            JobStatus.UNKNOWN.value,
+        ):
+
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(
+                    f"Timeout: PDT materialization job is not ready after {timeout}s. "
+                    f"Job id: {materialization_id}."
+                )
+
+            time.sleep(wait_time)
+
+            status_dict = self.pdt_build_status(materialization_id=materialization_id)
+            status = status_dict['status']
+
+        if status == JobStatus.ERROR.value:
+            msg = status_dict['message']
+            raise AirflowException(
+                f'PDT materialization job failed. Job id: {materialization_id}. Message:\n"{msg}"'
+            )
+        if status == JobStatus.CANCELLED.value:
+            raise AirflowException(f'PDT materialization job was cancelled. Job id: {materialization_id}.')
+        if status == JobStatus.UNKNOWN.value:
+            raise AirflowException(
+                f'PDT materialization job has unknown status. Job id: {materialization_id}.'
+            )
+
+        self.log.info('PDT materialization job completed successfully. Job id: %s.', materialization_id)
+
+    def get_api_source(self):
+        """Returns origin of the API call."""
+
+        # Infer API source based on Airflow version, e.g.:
+        # "2.1.4+composer" -> Cloud Composer
+        # "2.1.4" -> Airflow
+
+        self.log.debug(
+            "Got the following version for connection id: '%s'. Version: '%s'.", self.looker_conn_id, version
+        )
+
+        return 'composer' if 'composer' in version else 'airflow'

Review comment:
       If there's no such way, maybe we can just track `version` string itself..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #20882: [WIP] Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r812172836



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+# TODO: Temporary import for Looker API response stub, remove once looker sdk 22.2 released
+import attr
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+# TODO: Temporary import for Looker API response stub, remove once looker sdk 22.2 released
+from looker_sdk.rtl.model import Model
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+# TODO: uncomment once looker sdk 22.2 released
+# from looker_sdk.sdk.api40.models import MaterializePDT
+
+
+@attr.s(auto_attribs=True, init=False)
+class MaterializePDT(Model):
+    """
+    TODO: Temporary API response stub. Use Looker API class once looker sdk 22.2 released.
+    Attributes:
+    materialization_id: The ID of the enqueued materialization job, if any
+    resp_text: The string response
+    """
+
+    materialization_id: str
+    resp_text: Optional[str] = None
+
+    def __init__(self, *, materialization_id: str, resp_text: Optional[str] = None):
+        self.materialization_id = materialization_id
+        self.resp_text = resp_text
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+    ) -> None:
+        super().__init__()
+        self.looker_conn_id = looker_conn_id
+        self.source = self.get_api_source()
+
+    def start_pdt_build(
+        self,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+    ) -> MaterializePDT:
+        """
+        Submits a PDT materialization job to Looker.
+
+        :param model: Required. The model of the PDT to start building.
+        :param view: Required. The view of the PDT to start building.
+        :param query_params: Optional. Additional materialization parameters.
+        """
+        self.log.info("Submitting PDT materialization job. Model: '%s', view: '%s'.", model, view)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        looker_ver = sdk.versions().looker_release_version
+        if parse_version(looker_ver) < parse_version("22.2.0"):
+            raise AirflowException(f'This API requires Looker version 22.2+. Found: {looker_ver}.')
+
+        # TODO: Temporary API response stub. Use Looker API once looker sdk 22.2 released.
+        resp = MaterializePDT(materialization_id='366', resp_text=None)
+        # unpack query_params dict into kwargs (if not None)
+        # if query_params:
+        #     resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source, **query_params)
+        # else:
+        #     resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source)
+
+        self.log.info("Start PDT build response: '%s'.", resp)
+
+        return resp
+
+    def check_pdt_build(
+        self,
+        materialization_id: str,
+    ) -> MaterializePDT:
+        """
+        Gets the PDT materialization job status from Looker.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        self.log.info("Requesting PDT materialization job status. Job id: %s.", materialization_id)
+
+        # TODO: Temporary API response stub. Use Looker API once looker sdk 22.2 released.
+        resp = MaterializePDT(
+            materialization_id='366',
+            resp_text='{"status":"running","runtime":null,'
+            '"message":"Materialize 100m_sum_aggregate_sdt, ",'
+            '"task_slug":"f522424e00f0039a8c8f2d53d773f310"}',
+        )
+        sdk = self.get_looker_sdk()  # NOQA
+        # resp = sdk.check_pdt_build(materialization_id=materialization_id)
+
+        self.log.info("Check PDT build response: '%s'.", resp)
+        return resp
+
+    def pdt_build_status(
+        self,
+        materialization_id: str,
+    ) -> Dict:
+        """
+        Gets the PDT materialization job status.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        resp = self.check_pdt_build(materialization_id=materialization_id)
+
+        status_json = resp['resp_text']
+        status_dict = json.loads(status_json)
+
+        self.log.info(
+            "PDT materialization job id: %s. Status: '%s'.", materialization_id, status_dict['status']
+        )
+
+        return status_dict
+
+    def stop_pdt_build(
+        self,
+        materialization_id: str,
+    ) -> MaterializePDT:
+        """
+        Starts a PDT materialization job cancellation request.
+
+        :param materialization_id: Required. The materialization id to stop.
+        """
+        self.log.info("Stopping PDT materialization. Job id: %s.", materialization_id)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        # TODO: Temporary API response stub. Use Looker API once looker sdk 22.2 released.
+        resp = MaterializePDT(
+            materialization_id='366',
+            resp_text='{"success":true,"connection":"incremental_pdts_test",'
+            '"statusment":"KILL 810852",'
+            '"task_slug":"6bad8184f94407134251be8fd18af834"}',
+        )

Review comment:
       If this is necessary then we can use `json.dumps` instead of keeping dict as string




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20882: [WIP] Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1019378567


   Can you please rebase to latest main ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lwyszomi commented on a change in pull request #20882: [WIP] Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
lwyszomi commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r797638090



##########
File path: airflow/providers/google/cloud/operators/looker.py
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains Google Cloud Looker operators."""
+
+from typing import Optional, Dict
+
+from airflow.models.baseoperator import BaseOperator
+from airflow.providers.google.cloud.hooks.looker import LookerHook
+
+
+class LookerStartPdtBuildOperator(BaseOperator):

Review comment:
       Exactly




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20882: [WIP] Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1024977299


   I think another reabase (and fixes to failing checks) are needed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1060770856


   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1060701478


   Al right - @alekseiloginov  - I rebased your change on top of the latest main (which should already have typing-extenasions and constraints fixed).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r813980096



##########
File path: docs/apache-airflow-providers-google/operators/cloud/looker.rst
##########
@@ -0,0 +1,93 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+Google Cloud Looker Operators
+===============================
+
+Looker is a business intelligence software and big data analytics platform that
+helps you explore, analyze and share real-time business analytics easily.
+
+Looker has a Public API and associated SDK clients in different languages,
+which allow programmatic access to the Looker data platform.
+
+For more information visit `Looker API documentation <https://docs.looker.com/reference/api-and-integration>`_.
+
+Prerequisite Tasks
+------------------
+
+To use these operators, you must do a few things:
+
+* Install API libraries via **pip**.
+
+.. code-block:: bash
+
+  pip install 'apache-airflow[google]'
+
+Detailed information is available for :doc:`Installation <apache-airflow:installation/index>`.
+
+Communication between Airflow and Looker is done via `Looker API <https://docs.looker.com/reference/api-and-integration/api-reference/v4.0>`_.
+To facilitate the API communication Looker operators use `Looker SDK <https://pypi.org/project/looker-sdk/>`_ as an API client.
+Before calling API, Looker SDK needs to authenticate itself using your Looker API credentials.
+
+* Obtain your Looker API credentials using instructions in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-auth#authentication_with_an_sdk>`_.
+
+* Obtain your Looker API path and port as described in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-getting-started#looker_api_path_and_port>`_.
+
+* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection`
+
+See the following example of a connection setup:

Review comment:
       Documentation on connection setup should be in a separate documentation file.
   https://airflow.apache.org/docs/apache-airflow-providers-google/stable/connections/index.html
   In this documentation, it is worth adding at least one URL configuration example.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817914659



##########
File path: airflow/providers/google/cloud/operators/looker.py
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains Google Cloud Looker operators."""
+
+from typing import Optional, Dict
+
+from airflow.models.baseoperator import BaseOperator
+from airflow.providers.google.cloud.hooks.looker import LookerHook
+
+
+class LookerStartPdtBuildOperator(BaseOperator):
+    """
+    Submits a PDT materialization job to Looker.
+
+    :param looker_conn_id: Required. The connection ID to use connecting to Looker.
+    :type looker_conn_id: str
+    :param model: Required. The model of the PDT to start building.
+    :type model: str
+    :param view: Required. The view of the PDT to start building.
+    :type view: str
+    :param query_params: Optional. Additional materialization parameters.
+    :type query_params: Dict
+    :param asynchronous: Optional. Flag indicating whether to wait for the job to finish or return immediately.
+        This is useful for submitting long running jobs and
+        waiting on them asynchronously using the LookerCheckPdtBuildSensor
+    :type asynchronous: bool
+    :param cancel_on_kill: Optional. Flag which indicates whether cancel the hook's job or not, when on_kill is called.
+    :type cancel_on_kill: bool
+    :param wait_time: Optional. Number of seconds between checks for job to be ready. Used only if ``asynchronous`` is False.
+    :type wait_time: int
+    :param wait_timeout: Optional. How many seconds wait for job to be ready. Used only if ``asynchronous`` is False.
+    :type wait_timeout: int
+    """
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+        asynchronous: bool = False,
+        cancel_on_kill: bool = True,
+        wait_time: int = 10,
+        wait_timeout: Optional[int] = None,
+        **kwargs) -> None:
+        super().__init__(**kwargs)
+        self.model = model
+        self.view = view
+        self.query_params = query_params
+        self.looker_conn_id = looker_conn_id
+        self.asynchronous = asynchronous
+        self.cancel_on_kill = cancel_on_kill
+        self.wait_time = wait_time
+        self.wait_timeout = wait_timeout
+        self.hook: Optional[LookerHook] = None
+        self.materialization_id: Optional[str] = None
+
+    def execute(self, context):

Review comment:
       Done! ✅ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817921065



##########
File path: docs/apache-airflow-providers-google/operators/cloud/looker.rst
##########
@@ -0,0 +1,93 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+Google Cloud Looker Operators
+===============================
+
+Looker is a business intelligence software and big data analytics platform that
+helps you explore, analyze and share real-time business analytics easily.
+
+Looker has a Public API and associated SDK clients in different languages,
+which allow programmatic access to the Looker data platform.
+
+For more information visit `Looker API documentation <https://docs.looker.com/reference/api-and-integration>`_.
+
+Prerequisite Tasks
+------------------
+
+To use these operators, you must do a few things:
+
+* Install API libraries via **pip**.
+
+.. code-block:: bash
+
+  pip install 'apache-airflow[google]'
+
+Detailed information is available for :doc:`Installation <apache-airflow:installation/index>`.
+
+Communication between Airflow and Looker is done via `Looker API <https://docs.looker.com/reference/api-and-integration/api-reference/v4.0>`_.
+To facilitate the API communication Looker operators use `Looker SDK <https://pypi.org/project/looker-sdk/>`_ as an API client.
+Before calling API, Looker SDK needs to authenticate itself using your Looker API credentials.
+
+* Obtain your Looker API credentials using instructions in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-auth#authentication_with_an_sdk>`_.
+
+* Obtain your Looker API path and port as described in the `Looker API documentation <https://docs.looker.com/reference/api-and-integration/api-getting-started#looker_api_path_and_port>`_.
+
+* Setup a Looker connection in Airflow. You can check :doc:`apache-airflow:howto/connection`
+
+See the following example of a connection setup:

Review comment:
       Done! ✅ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817125088



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(

Review comment:
       Not sure what you mean by `Looker is available for selection during connection setup`. Do you mean the drop down list with connection types (Looker connection has `HTTP` type)? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817043465



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+    ) -> None:
+        super().__init__()
+        self.looker_conn_id = looker_conn_id
+        self.source = self.get_api_source()
+
+    def start_pdt_build(
+        self,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+    ):
+        """
+        Submits a PDT materialization job to Looker.
+
+        :param model: Required. The model of the PDT to start building.
+        :param view: Required. The view of the PDT to start building.
+        :param query_params: Optional. Additional materialization parameters.
+        """
+        self.log.info("Submitting PDT materialization job. Model: '%s', view: '%s'.", model, view)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        looker_ver = sdk.versions().looker_release_version
+        if parse_version(looker_ver) < parse_version("22.2.0"):
+            raise AirflowException(f'This API requires Looker version 22.2+. Found: {looker_ver}.')
+
+        # unpack query_params dict into kwargs (if not None)
+        if query_params:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source, **query_params)
+        else:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source)
+
+        self.log.info("Start PDT build response: '%s'.", resp)
+
+        return resp
+
+    def check_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Gets the PDT materialization job status from Looker.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        self.log.info("Requesting PDT materialization job status. Job id: %s.", materialization_id)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.check_pdt_build(materialization_id=materialization_id)
+
+        self.log.info("Check PDT build response: '%s'.", resp)
+        return resp
+
+    def pdt_build_status(
+        self,
+        materialization_id: str,
+    ) -> Dict:
+        """
+        Gets the PDT materialization job status.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        resp = self.check_pdt_build(materialization_id=materialization_id)
+
+        status_json = resp['resp_text']
+        status_dict = json.loads(status_json)
+
+        self.log.info(
+            "PDT materialization job id: %s. Status: '%s'.", materialization_id, status_dict['status']
+        )
+
+        return status_dict
+
+    def stop_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Starts a PDT materialization job cancellation request.
+
+        :param materialization_id: Required. The materialization id to stop.
+        """
+        self.log.info("Stopping PDT materialization. Job id: %s.", materialization_id)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.stop_pdt_build(materialization_id=materialization_id, source=self.source)
+
+        self.log.info("Stop PDT build response: '%s'.", resp)
+        return resp
+
+    def wait_for_job(
+        self,
+        materialization_id: str,
+        wait_time: int = 10,
+        timeout: Optional[int] = None,
+    ) -> None:
+        """
+        Helper method which polls a PDT materialization job to check if it finishes.
+
+        :param materialization_id: Required. The materialization id to wait for.
+        :param wait_time: Optional. Number of seconds between checks.
+        :param timeout: Optional. How many seconds wait for job to be ready.
+        Used only if ``asynchronous`` is False.
+        """
+        self.log.info('Waiting for PDT materialization job to complete. Job id: %s.', materialization_id)
+
+        status = None
+        start = time.monotonic()
+
+        while status not in (
+            JobStatus.DONE.value,
+            JobStatus.ERROR.value,
+            JobStatus.CANCELLED.value,
+            JobStatus.UNKNOWN.value,
+        ):
+
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(
+                    f"Timeout: PDT materialization job is not ready after {timeout}s. "
+                    f"Job id: {materialization_id}."
+                )
+
+            time.sleep(wait_time)
+
+            status_dict = self.pdt_build_status(materialization_id=materialization_id)
+            status = status_dict['status']
+
+        if status == JobStatus.ERROR.value:
+            msg = status_dict['message']
+            raise AirflowException(
+                f'PDT materialization job failed. Job id: {materialization_id}. Message:\n"{msg}"'
+            )
+        if status == JobStatus.CANCELLED.value:
+            raise AirflowException(f'PDT materialization job was cancelled. Job id: {materialization_id}.')
+        if status == JobStatus.UNKNOWN.value:
+            raise AirflowException(
+                f'PDT materialization job has unknown status. Job id: {materialization_id}.'
+            )
+
+        self.log.info('PDT materialization job completed successfully. Job id: %s.', materialization_id)
+
+    def get_api_source(self):
+        """Returns origin of the API call."""
+
+        # Infer API source based on Airflow version, e.g.:
+        # "2.1.4+composer" -> Cloud Composer
+        # "2.1.4" -> Airflow
+
+        self.log.debug(
+            "Got the following version for connection id: '%s'. Version: '%s'.", self.looker_conn_id, version
+        )
+
+        return 'composer' if 'composer' in version else 'airflow'
+
+    def get_looker_sdk(self):
+        """Returns Looker SDK client for Looker API 4.0."""
+
+        conn = self.get_connection(self.looker_conn_id)
+        settings = LookerApiSettings(conn)
+
+        transport = requests_transport.RequestsTransport.configure(settings)
+        return methods40.Looker40SDK(
+            auth_session.AuthSession(settings, transport, serialize.deserialize40, "4.0"),
+            serialize.deserialize40,
+            serialize.serialize,
+            transport,
+            "4.0",
+        )
+
+
+class LookerApiSettings(api_settings.ApiSettings):
+    """Custom implementation of Looker SDK's `ApiSettings` class."""
+
+    def __init__(
+        self,
+        conn: Connection,
+    ) -> None:
+        self.conn = conn  # need to init before `read_config` is called in super
+        super().__init__()
+
+    def read_config(self) -> Dict[str, str]:
+        """
+        Overrides the default logic of getting connection settings. Fetches
+        the connection settings from Airflow's connection object.
+        """
+
+        config = {}
+
+        if self.conn.host is None:
+            raise AirflowException(f'No `host` was supplied in connection: {self.conn.id}.')
+
+        if self.conn.port:
+            config["base_url"] = f"{self.conn.host}:{self.conn.port}"  # port is optional
+        else:
+            config["base_url"] = self.conn.host
+
+        if self.conn.login:
+            config["client_id"] = self.conn.login
+        else:
+            raise AirflowException(f'No `login` was supplied in connection: {self.conn.id}.')
+
+        if self.conn.password:
+            config["client_secret"] = self.conn.password
+        else:
+            raise AirflowException(f'No `password` was supplied in connection: {self.conn.id}.')
+
+        extras = self.conn.extra_dejson  # type: Dict
+
+        if 'verify_ssl' in extras:
+            config["verify_ssl"] = extras["verify_ssl"]  # optional
+
+        if 'timeout' in extras:
+            config["timeout"] = extras["timeout"]  # optional

Review comment:
       Our Looker SDK can take str and will convert it to int. See: https://github.com/looker-open-source/sdk-codegen/blob/2432a688438d64d18531ee3ac920c9933bc8a185/python/looker_sdk/rtl/api_settings.py#L90




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817997055



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(

Review comment:
       I have the same thoughts first, but then I decided not to create a new connection as Looker in the process of transitioning to GCP and ideally it should use GCP connection soon. So I didn't want for users to get used to this temporary Looker connection. Hope that does make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817935893



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+    ) -> None:
+        super().__init__()
+        self.looker_conn_id = looker_conn_id
+        self.source = self.get_api_source()
+
+    def start_pdt_build(
+        self,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+    ):
+        """
+        Submits a PDT materialization job to Looker.
+
+        :param model: Required. The model of the PDT to start building.
+        :param view: Required. The view of the PDT to start building.
+        :param query_params: Optional. Additional materialization parameters.
+        """
+        self.log.info("Submitting PDT materialization job. Model: '%s', view: '%s'.", model, view)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        looker_ver = sdk.versions().looker_release_version
+        if parse_version(looker_ver) < parse_version("22.2.0"):
+            raise AirflowException(f'This API requires Looker version 22.2+. Found: {looker_ver}.')
+
+        # unpack query_params dict into kwargs (if not None)
+        if query_params:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source, **query_params)
+        else:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source)
+
+        self.log.info("Start PDT build response: '%s'.", resp)
+
+        return resp
+
+    def check_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Gets the PDT materialization job status from Looker.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        self.log.info("Requesting PDT materialization job status. Job id: %s.", materialization_id)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.check_pdt_build(materialization_id=materialization_id)
+
+        self.log.info("Check PDT build response: '%s'.", resp)
+        return resp
+
+    def pdt_build_status(
+        self,
+        materialization_id: str,
+    ) -> Dict:
+        """
+        Gets the PDT materialization job status.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        resp = self.check_pdt_build(materialization_id=materialization_id)
+
+        status_json = resp['resp_text']
+        status_dict = json.loads(status_json)
+
+        self.log.info(
+            "PDT materialization job id: %s. Status: '%s'.", materialization_id, status_dict['status']
+        )
+
+        return status_dict
+
+    def stop_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Starts a PDT materialization job cancellation request.
+
+        :param materialization_id: Required. The materialization id to stop.
+        """
+        self.log.info("Stopping PDT materialization. Job id: %s.", materialization_id)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.stop_pdt_build(materialization_id=materialization_id, source=self.source)
+
+        self.log.info("Stop PDT build response: '%s'.", resp)
+        return resp
+
+    def wait_for_job(
+        self,
+        materialization_id: str,
+        wait_time: int = 10,
+        timeout: Optional[int] = None,
+    ) -> None:
+        """
+        Helper method which polls a PDT materialization job to check if it finishes.
+
+        :param materialization_id: Required. The materialization id to wait for.
+        :param wait_time: Optional. Number of seconds between checks.
+        :param timeout: Optional. How many seconds wait for job to be ready.
+        Used only if ``asynchronous`` is False.
+        """
+        self.log.info('Waiting for PDT materialization job to complete. Job id: %s.', materialization_id)
+
+        status = None
+        start = time.monotonic()
+
+        while status not in (
+            JobStatus.DONE.value,
+            JobStatus.ERROR.value,
+            JobStatus.CANCELLED.value,
+            JobStatus.UNKNOWN.value,
+        ):
+
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(
+                    f"Timeout: PDT materialization job is not ready after {timeout}s. "
+                    f"Job id: {materialization_id}."
+                )
+
+            time.sleep(wait_time)
+
+            status_dict = self.pdt_build_status(materialization_id=materialization_id)
+            status = status_dict['status']
+
+        if status == JobStatus.ERROR.value:
+            msg = status_dict['message']
+            raise AirflowException(
+                f'PDT materialization job failed. Job id: {materialization_id}. Message:\n"{msg}"'
+            )
+        if status == JobStatus.CANCELLED.value:
+            raise AirflowException(f'PDT materialization job was cancelled. Job id: {materialization_id}.')
+        if status == JobStatus.UNKNOWN.value:
+            raise AirflowException(
+                f'PDT materialization job has unknown status. Job id: {materialization_id}.'
+            )
+
+        self.log.info('PDT materialization job completed successfully. Job id: %s.', materialization_id)
+
+    def get_api_source(self):
+        """Returns origin of the API call."""
+
+        # Infer API source based on Airflow version, e.g.:
+        # "2.1.4+composer" -> Cloud Composer
+        # "2.1.4" -> Airflow
+
+        self.log.debug(
+            "Got the following version for connection id: '%s'. Version: '%s'.", self.looker_conn_id, version
+        )
+
+        return 'composer' if 'composer' in version else 'airflow'

Review comment:
       Tracking the version number is fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r817919774



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+# TODO: Temporary import for Looker API response stub, remove once looker sdk 22.2 released

Review comment:
       I removed these TODOs completely as looker sdk 22.2 is finally launched now.

##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+# TODO: Temporary import for Looker API response stub, remove once looker sdk 22.2 released
+import attr
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+# TODO: Temporary import for Looker API response stub, remove once looker sdk 22.2 released
+from looker_sdk.rtl.model import Model
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+# TODO: uncomment once looker sdk 22.2 released
+# from looker_sdk.sdk.api40.models import MaterializePDT
+
+
+@attr.s(auto_attribs=True, init=False)
+class MaterializePDT(Model):
+    """
+    TODO: Temporary API response stub. Use Looker API class once looker sdk 22.2 released.

Review comment:
       same here - I removed these TODOs completely as looker sdk 22.2 is finally launched now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1060078082


   @potiuk - just rebased!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r813956130



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+    ) -> None:
+        super().__init__()
+        self.looker_conn_id = looker_conn_id
+        self.source = self.get_api_source()
+
+    def start_pdt_build(
+        self,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+    ):
+        """
+        Submits a PDT materialization job to Looker.
+
+        :param model: Required. The model of the PDT to start building.
+        :param view: Required. The view of the PDT to start building.
+        :param query_params: Optional. Additional materialization parameters.
+        """
+        self.log.info("Submitting PDT materialization job. Model: '%s', view: '%s'.", model, view)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        looker_ver = sdk.versions().looker_release_version
+        if parse_version(looker_ver) < parse_version("22.2.0"):
+            raise AirflowException(f'This API requires Looker version 22.2+. Found: {looker_ver}.')
+
+        # unpack query_params dict into kwargs (if not None)
+        if query_params:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source, **query_params)
+        else:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source)
+
+        self.log.info("Start PDT build response: '%s'.", resp)
+
+        return resp
+
+    def check_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Gets the PDT materialization job status from Looker.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        self.log.info("Requesting PDT materialization job status. Job id: %s.", materialization_id)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.check_pdt_build(materialization_id=materialization_id)
+
+        self.log.info("Check PDT build response: '%s'.", resp)
+        return resp
+
+    def pdt_build_status(
+        self,
+        materialization_id: str,
+    ) -> Dict:
+        """
+        Gets the PDT materialization job status.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        resp = self.check_pdt_build(materialization_id=materialization_id)
+
+        status_json = resp['resp_text']
+        status_dict = json.loads(status_json)
+
+        self.log.info(
+            "PDT materialization job id: %s. Status: '%s'.", materialization_id, status_dict['status']
+        )
+
+        return status_dict
+
+    def stop_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Starts a PDT materialization job cancellation request.
+
+        :param materialization_id: Required. The materialization id to stop.
+        """
+        self.log.info("Stopping PDT materialization. Job id: %s.", materialization_id)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.stop_pdt_build(materialization_id=materialization_id, source=self.source)
+
+        self.log.info("Stop PDT build response: '%s'.", resp)
+        return resp
+
+    def wait_for_job(
+        self,
+        materialization_id: str,
+        wait_time: int = 10,
+        timeout: Optional[int] = None,
+    ) -> None:
+        """
+        Helper method which polls a PDT materialization job to check if it finishes.
+
+        :param materialization_id: Required. The materialization id to wait for.
+        :param wait_time: Optional. Number of seconds between checks.
+        :param timeout: Optional. How many seconds wait for job to be ready.
+        Used only if ``asynchronous`` is False.
+        """
+        self.log.info('Waiting for PDT materialization job to complete. Job id: %s.', materialization_id)
+
+        status = None
+        start = time.monotonic()
+
+        while status not in (
+            JobStatus.DONE.value,
+            JobStatus.ERROR.value,
+            JobStatus.CANCELLED.value,
+            JobStatus.UNKNOWN.value,
+        ):
+
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(
+                    f"Timeout: PDT materialization job is not ready after {timeout}s. "
+                    f"Job id: {materialization_id}."
+                )
+
+            time.sleep(wait_time)
+
+            status_dict = self.pdt_build_status(materialization_id=materialization_id)
+            status = status_dict['status']
+
+        if status == JobStatus.ERROR.value:
+            msg = status_dict['message']
+            raise AirflowException(
+                f'PDT materialization job failed. Job id: {materialization_id}. Message:\n"{msg}"'
+            )
+        if status == JobStatus.CANCELLED.value:
+            raise AirflowException(f'PDT materialization job was cancelled. Job id: {materialization_id}.')
+        if status == JobStatus.UNKNOWN.value:
+            raise AirflowException(
+                f'PDT materialization job has unknown status. Job id: {materialization_id}.'
+            )
+
+        self.log.info('PDT materialization job completed successfully. Job id: %s.', materialization_id)
+
+    def get_api_source(self):
+        """Returns origin of the API call."""
+
+        # Infer API source based on Airflow version, e.g.:
+        # "2.1.4+composer" -> Cloud Composer
+        # "2.1.4" -> Airflow
+
+        self.log.debug(
+            "Got the following version for connection id: '%s'. Version: '%s'.", self.looker_conn_id, version
+        )
+
+        return 'composer' if 'composer' in version else 'airflow'
+
+    def get_looker_sdk(self):
+        """Returns Looker SDK client for Looker API 4.0."""
+
+        conn = self.get_connection(self.looker_conn_id)
+        settings = LookerApiSettings(conn)
+
+        transport = requests_transport.RequestsTransport.configure(settings)
+        return methods40.Looker40SDK(
+            auth_session.AuthSession(settings, transport, serialize.deserialize40, "4.0"),
+            serialize.deserialize40,
+            serialize.serialize,
+            transport,
+            "4.0",
+        )
+
+
+class LookerApiSettings(api_settings.ApiSettings):
+    """Custom implementation of Looker SDK's `ApiSettings` class."""
+
+    def __init__(
+        self,
+        conn: Connection,
+    ) -> None:
+        self.conn = conn  # need to init before `read_config` is called in super
+        super().__init__()
+
+    def read_config(self) -> Dict[str, str]:
+        """
+        Overrides the default logic of getting connection settings. Fetches
+        the connection settings from Airflow's connection object.
+        """
+
+        config = {}
+
+        if self.conn.host is None:
+            raise AirflowException(f'No `host` was supplied in connection: {self.conn.id}.')
+
+        if self.conn.port:
+            config["base_url"] = f"{self.conn.host}:{self.conn.port}"  # port is optional
+        else:
+            config["base_url"] = self.conn.host
+
+        if self.conn.login:
+            config["client_id"] = self.conn.login
+        else:
+            raise AirflowException(f'No `login` was supplied in connection: {self.conn.id}.')
+
+        if self.conn.password:
+            config["client_secret"] = self.conn.password
+        else:
+            raise AirflowException(f'No `password` was supplied in connection: {self.conn.id}.')
+
+        extras = self.conn.extra_dejson  # type: Dict
+
+        if 'verify_ssl' in extras:
+            config["verify_ssl"] = extras["verify_ssl"]  # optional
+
+        if 'timeout' in extras:
+            config["timeout"] = extras["timeout"]  # optional

Review comment:
       The connection is configured very often as a URL, which unfortunately makes the field type to be text. Should we here be casting type? See: https://github.com/apache/airflow/pull/21155




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#discussion_r813954369



##########
File path: airflow/providers/google/cloud/hooks/looker.py
##########
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Looker hook."""
+
+import json
+import time
+from enum import Enum
+from typing import Dict, Optional
+
+from looker_sdk.rtl import api_settings, auth_session, requests_transport, serialize
+from looker_sdk.sdk.api40 import methods as methods40
+from packaging.version import parse as parse_version
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models.connection import Connection
+from airflow.version import version
+
+
+class LookerHook(BaseHook):
+    """Hook for Looker APIs."""
+
+    def __init__(
+        self,
+        looker_conn_id: str,
+    ) -> None:
+        super().__init__()
+        self.looker_conn_id = looker_conn_id
+        self.source = self.get_api_source()
+
+    def start_pdt_build(
+        self,
+        model: str,
+        view: str,
+        query_params: Optional[Dict] = None,
+    ):
+        """
+        Submits a PDT materialization job to Looker.
+
+        :param model: Required. The model of the PDT to start building.
+        :param view: Required. The view of the PDT to start building.
+        :param query_params: Optional. Additional materialization parameters.
+        """
+        self.log.info("Submitting PDT materialization job. Model: '%s', view: '%s'.", model, view)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        looker_ver = sdk.versions().looker_release_version
+        if parse_version(looker_ver) < parse_version("22.2.0"):
+            raise AirflowException(f'This API requires Looker version 22.2+. Found: {looker_ver}.')
+
+        # unpack query_params dict into kwargs (if not None)
+        if query_params:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source, **query_params)
+        else:
+            resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source)
+
+        self.log.info("Start PDT build response: '%s'.", resp)
+
+        return resp
+
+    def check_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Gets the PDT materialization job status from Looker.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        self.log.info("Requesting PDT materialization job status. Job id: %s.", materialization_id)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.check_pdt_build(materialization_id=materialization_id)
+
+        self.log.info("Check PDT build response: '%s'.", resp)
+        return resp
+
+    def pdt_build_status(
+        self,
+        materialization_id: str,
+    ) -> Dict:
+        """
+        Gets the PDT materialization job status.
+
+        :param materialization_id: Required. The materialization id to check status for.
+        """
+        resp = self.check_pdt_build(materialization_id=materialization_id)
+
+        status_json = resp['resp_text']
+        status_dict = json.loads(status_json)
+
+        self.log.info(
+            "PDT materialization job id: %s. Status: '%s'.", materialization_id, status_dict['status']
+        )
+
+        return status_dict
+
+    def stop_pdt_build(
+        self,
+        materialization_id: str,
+    ):
+        """
+        Starts a PDT materialization job cancellation request.
+
+        :param materialization_id: Required. The materialization id to stop.
+        """
+        self.log.info("Stopping PDT materialization. Job id: %s.", materialization_id)
+        self.log.debug("PDT materialization job source: '%s'.", self.source)
+
+        sdk = self.get_looker_sdk()
+        resp = sdk.stop_pdt_build(materialization_id=materialization_id, source=self.source)
+
+        self.log.info("Stop PDT build response: '%s'.", resp)
+        return resp
+
+    def wait_for_job(
+        self,
+        materialization_id: str,
+        wait_time: int = 10,
+        timeout: Optional[int] = None,
+    ) -> None:
+        """
+        Helper method which polls a PDT materialization job to check if it finishes.
+
+        :param materialization_id: Required. The materialization id to wait for.
+        :param wait_time: Optional. Number of seconds between checks.
+        :param timeout: Optional. How many seconds wait for job to be ready.
+        Used only if ``asynchronous`` is False.
+        """
+        self.log.info('Waiting for PDT materialization job to complete. Job id: %s.', materialization_id)
+
+        status = None
+        start = time.monotonic()
+
+        while status not in (
+            JobStatus.DONE.value,
+            JobStatus.ERROR.value,
+            JobStatus.CANCELLED.value,
+            JobStatus.UNKNOWN.value,
+        ):
+
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(
+                    f"Timeout: PDT materialization job is not ready after {timeout}s. "
+                    f"Job id: {materialization_id}."
+                )
+
+            time.sleep(wait_time)
+
+            status_dict = self.pdt_build_status(materialization_id=materialization_id)
+            status = status_dict['status']
+
+        if status == JobStatus.ERROR.value:
+            msg = status_dict['message']
+            raise AirflowException(
+                f'PDT materialization job failed. Job id: {materialization_id}. Message:\n"{msg}"'
+            )
+        if status == JobStatus.CANCELLED.value:
+            raise AirflowException(f'PDT materialization job was cancelled. Job id: {materialization_id}.')
+        if status == JobStatus.UNKNOWN.value:
+            raise AirflowException(
+                f'PDT materialization job has unknown status. Job id: {materialization_id}.'
+            )
+
+        self.log.info('PDT materialization job completed successfully. Job id: %s.', materialization_id)
+
+    def get_api_source(self):
+        """Returns origin of the API call."""
+
+        # Infer API source based on Airflow version, e.g.:
+        # "2.1.4+composer" -> Cloud Composer
+        # "2.1.4" -> Airflow
+
+        self.log.debug(
+            "Got the following version for connection id: '%s'. Version: '%s'.", self.looker_conn_id, version
+        )
+
+        return 'composer' if 'composer' in version else 'airflow'

Review comment:
       We don't want any SasS-specific code in our code. If you need to identify environments, you can use an environment variable and configure this parameter based on it.
   For example, see: 
   https://github.com/apache/airflow/blob/272d242316a1d163598d93511e2bfd7b717b4dce/airflow/providers/snowflake/hooks/snowflake.py#L184




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on pull request #20882: [WIP] Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1049880526


   /retest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1059202623


   CI is failing with dependency conflict between `apache-airflow` and `apache-airflow[devel-ci]`:
   ```
     #32 96.41 ERROR: Cannot install apache-airflow and apache-airflow[devel-ci]==2.3.0.dev0 because these package versions have conflicting dependencies.
     #32 96.41 
     #32 96.41 The conflict is caused by:
     #32 96.41     apache-airflow[devel-ci] 2.3.0.dev0 depends on cloudpickle<1.5.0 and >=1.4.1
     #32 96.41     apache-beam 2.36.0 depends on cloudpickle<3 and >=2.0.0
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1058181035


   @potiuk We added this as a very temporary workaround, while we were working on fixing the dependency issue (before that I couldn't use looker sdk with Airflow at all due to another dependency conflict, so at least I was unblocked to test the integration).
   
   Unfortunately, `looker-sdk>=22.2.0` is required as the new API that we use for the integration was introduced in `22.2.0`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alekseiloginov commented on pull request #20882: Add Looker PDT operators

Posted by GitBox <gi...@apache.org>.
alekseiloginov commented on pull request #20882:
URL: https://github.com/apache/airflow/pull/20882#issuecomment-1058241182


   As @potiuk kindly offered to hold a release for one day, we will try to resolve the dependency issue and unpin python 3.9.9 by tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org