You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/08/12 21:01:08 UTC

[GitHub] [airflow] michalslowikowski00 opened a new pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

michalslowikowski00 opened a new pull request #10304:
URL: https://github.com/apache/airflow/pull/10304


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475511304



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -82,11 +83,11 @@ def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[
         A job group is a job that is executed from a specific node in a flow.
 
         :param job_id The ID of the job that will be fetched.
-        :type job_id: int
+        :type job_id: int.

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475658128



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+class DataprepExampleDagsTest(GoogleSystemTest):

Review comment:
       We don't use Google authorization, so we shouldn't use GoogleSystemTest. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r478213435



##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -49,6 +48,68 @@ def __init__(self, *, job_id: int, **kwargs) -> None:
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_group_id The ID of the job that will be requests
+    :type job_group_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects
+    :type include_deleted: bool
+    """
+
+    template_fields = ("job_group_id",)

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r480967646



##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -35,20 +35,90 @@ class DataprepGetJobsForJobGroupOperator(BaseOperator):
         For more information on how to use this operator, take a look at the guide:
         :ref:`howto/operator:DataprepGetJobsForJobGroupOperator`
 
-
     :param job_id The ID of the job that will be requests
     :type job_id: int
     """
 
     template_fields = ("job_id",)
 
     @apply_defaults
-    def __init__(self, *, job_id: int, **kwargs) -> None:
+    def __init__(self, *, dataprep_conn_id: str = "dataprep_default", job_id: int, **kwargs) -> None:
         super().__init__(**kwargs)
+        self.dataprep_conn_id = (dataprep_conn_id,)
         self.job_id = job_id
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_default",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_group_id The ID of the job that will be requests

Review comment:
       ```suggestion
       :param job_group_id: The ID of the job that will be requests
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475530855



##########
File path: docs/howto/operator/google/cloud/dataprep.rst
##########
@@ -17,7 +17,28 @@
 
 Google Dataprep Operators
 =========================
-`Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+Dataprep is the intelligent cloud data service to visually explore, clean, and prepare data for analysis and machine learning.
+Service can be use to explore and transform raw data from disparate and/or large datasets into clean and structured data for further analysis and processing.
+Dataprep Job is an internal object encoding the information necessary to run a part of a Cloud Dataprep job group.
+For more information about the service visit `Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+
+Before using Dataprep within Airflow you need to authenticate your account with TOKEN.

Review comment:
       Can you add section header here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r472901010



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER
+from tests.test_utils.system_tests_class import SystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+class DataprepExampleDagsTest(SystemTest):

Review comment:
       ```suggestion
   class DataprepExampleDagsTest(GoogleSystemTest):
   ```
   This will automatically add `@pytest.mark.system("google")`. Btw. do we need any setup / teardown with creating buckets? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r472955771



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER
+from tests.test_utils.system_tests_class import SystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+class DataprepExampleDagsTest(SystemTest):

Review comment:
       We can wait for the new operator in this PR. In this way, we should have fully functional DAG and there will be no need for creating pipeline in setup. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477614337



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -50,26 +54,60 @@ def _headers(self) -> Dict[str, str]:
         }
         return headers
 
-    @property
-    def _token(self) -> str:
-        conn = self.get_connection(self.dataprep_conn_id)
-        token = conn.extra_dejson.get("token")
-        if token is None:
-            raise AirflowException(
-                "Dataprep token is missing or has invalid format. "
-                "Please make sure that Dataprep token is added to the Airflow Connections."
-            )
-        return token
-
     @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
     def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched
         :type job_id: int
         """
-        url: str = f"{self._url}/{job_id}/jobs"
+
+        endpoint_path = f"/v4/jobGroups/{job_id}/jobs"
+        url: str = os.path.join(self._base_url, endpoint_path)

Review comment:
       Using `path.join` requires no leading slash
   ```suggestion
           endpoint_path = f"v4/jobGroups/{job_id}/jobs"
           url: str = os.path.join(self._base_url, endpoint_path)
   ```
   because:
   ```
   In [2]: os.path.join("https://app:123/path/", "/api/v1/")
   Out[2]: '/api/v1/'
   
   In [3]: os.path.join("https://app:123/path/", "api/v1/")
   Out[3]: 'https://app:123/path/api/v1/'
   ```
   And indeed in logs I see
   ```
   requests.exceptions.MissingSchema: Invalid URL '/v4/jobGroups': No schema supplied.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r476320009



##########
File path: docs/howto/operator/google/cloud/dataprep.rst
##########
@@ -17,7 +17,28 @@
 
 Google Dataprep Operators
 =========================
-`Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+Dataprep is the intelligent cloud data service to visually explore, clean, and prepare data for analysis and machine learning.
+Service can be use to explore and transform raw data from disparate and/or large datasets into clean and structured data for further analysis and processing.
+Dataprep Job is an internal object encoding the information necessary to run a part of a Cloud Dataprep job group.
+For more information about the service visit `Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+
+Before using Dataprep within Airflow you need to authenticate your account with TOKEN.
+To get connection Dataprep with Airflow you need Dataprep token. Please follow Dataprep instructions to do it.
+https://clouddataprep.com/documentation/api#section/Authentication
+
+TOKEN should be added to the Connection in Airflow in JSON format.
+Her you can check how to do such connection:
+https://airflow.readthedocs.io/en/stable/howto/connection/index.html#editing-a-connection-with-the-ui

Review comment:
       Yes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477224927



##########
File path: tests/conftest.py
##########
@@ -251,6 +253,9 @@ def pytest_configure(config):
     config.addinivalue_line(
         "markers", "airflow_2: mark tests that works only on Airflow 2.0 / master"
     )
+    config.addinivalue_line(
+        "markers", "credential_env(name): mark tests that require credential env in CREDENTIALS_ENV"
+    )

Review comment:
       I think it would be simple to add
   ```python
   @pytest.mark.skipif(os.environ.get("DATAPREP_TOKEN") is None, 'Dataprep token not present')
   ```
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r478241590



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -50,26 +54,60 @@ def _headers(self) -> Dict[str, str]:
         }
         return headers
 
-    @property
-    def _token(self) -> str:
-        conn = self.get_connection(self.dataprep_conn_id)
-        token = conn.extra_dejson.get("token")
-        if token is None:
-            raise AirflowException(
-                "Dataprep token is missing or has invalid format. "
-                "Please make sure that Dataprep token is added to the Airflow Connections."
-            )
-        return token
-
     @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
     def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched
         :type job_id: int
         """
-        url: str = f"{self._url}/{job_id}/jobs"
+
+        endpoint_path = f"/v4/jobGroups/{job_id}/jobs"
+        url: str = os.path.join(self._base_url, endpoint_path)
         response = requests.get(url, headers=self._headers)
         response.raise_for_status()

Review comment:
       ```python
       def _raise_for_status(self, response):
           try:
               response.raise_for_status()
           except HTTPError:
               self.log.error(response.json().get('exception'))
               raise
   ```
   this may help




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r476350447



##########
File path: docs/howto/operator/google/cloud/dataprep.rst
##########
@@ -17,7 +17,28 @@
 
 Google Dataprep Operators
 =========================
-`Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+Dataprep is the intelligent cloud data service to visually explore, clean, and prepare data for analysis and machine learning.
+Service can be use to explore and transform raw data from disparate and/or large datasets into clean and structured data for further analysis and processing.
+Dataprep Job is an internal object encoding the information necessary to run a part of a Cloud Dataprep job group.
+For more information about the service visit `Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+
+Before using Dataprep within Airflow you need to authenticate your account with TOKEN.

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r472905659



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER
+from tests.test_utils.system_tests_class import SystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+class DataprepExampleDagsTest(SystemTest):

Review comment:
       Ok, I will add it and we do not need creating buckets in setup. Or maybe I am not aware of something...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475648917



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -56,7 +57,7 @@ def _token(self) -> str:
         token = conn.extra_dejson.get("token")
         if token is None:
             raise AirflowException(
-                "Dataprep token is missing or has invalid format. "
+                "Dataprep token is missing or has invalid format."

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477551063



##########
File path: tests/conftest.py
##########
@@ -251,6 +253,9 @@ def pytest_configure(config):
     config.addinivalue_line(
         "markers", "airflow_2: mark tests that works only on Airflow 2.0 / master"
     )
+    config.addinivalue_line(
+        "markers", "credential_env(name): mark tests that require credential env in CREDENTIALS_ENV"
+    )

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r474802652



##########
File path: docs/howto/operator/google/cloud/dataprep.rst
##########
@@ -17,7 +17,25 @@
 
 Google Dataprep Operators
 =========================
-`Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+Dataprep is the intelligent cloud data service to visually explore, clean, and prepare data for analysis and machine learning.
+Service can be use to explore and transform raw data from disparate and/or large datasets into clean and structured data for further analysis and processing.
+Dataprep Job is an internal object encoding the information necessary to run a part of a Cloud Dataprep job group.
+For more information about the service visit `Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+
+Before using Dataprep within Airflow you need to authenticate your account with TOKEN.
+To get connection Dataprep with Airflow you need Dataprep token. Please follow Dataprep instructions to do it.
+https://clouddataprep.com/documentation/api#section/Authentication
+
+TOKEN should be added to the Connection in Airflow in JSON format.
+Her you can check how to do such connection:
+https://airflow.readthedocs.io/en/stable/howto/connection/index.html#editing-a-connection-with-the-ui
+
+See following example:
+Set values for these fields:
+.. code-block::
+
+  Conn Id: "your_conn_id"
+  Extra: {"token": "TOKEN"}

Review comment:
       We should change this to
   ```suggestion
     Extra: {"dataprep__extra__token": "TOKEN"}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r478214909



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -50,26 +54,60 @@ def _headers(self) -> Dict[str, str]:
         }
         return headers
 
-    @property
-    def _token(self) -> str:
-        conn = self.get_connection(self.dataprep_conn_id)
-        token = conn.extra_dejson.get("token")
-        if token is None:
-            raise AirflowException(
-                "Dataprep token is missing or has invalid format. "
-                "Please make sure that Dataprep token is added to the Airflow Connections."
-            )
-        return token
-
     @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
     def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched
         :type job_id: int
         """
-        url: str = f"{self._url}/{job_id}/jobs"
+
+        endpoint_path = f"/v4/jobGroups/{job_id}/jobs"
+        url: str = os.path.join(self._base_url, endpoint_path)

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477225763



##########
File path: tests/providers/google/cloud/operators/test_dataprep.py
##########
@@ -26,14 +26,15 @@
 INCLUDE_DELETED = False
 EMBED = ""
 DATAPREP_JOB_RECIPE_ID = 1234567
+PATH_TO_OUTOUT_FILE = "path_to_output_file"

Review comment:
       If you use same variables in example DAG then it would be wise to import them from the example to keep test and example in consistency. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475533033



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")

Review comment:
       We should skip this test when this environment variable is not set, and not set a dummy value.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r480967419



##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -35,20 +35,90 @@ class DataprepGetJobsForJobGroupOperator(BaseOperator):
         For more information on how to use this operator, take a look at the guide:
         :ref:`howto/operator:DataprepGetJobsForJobGroupOperator`
 
-
     :param job_id The ID of the job that will be requests
     :type job_id: int
     """
 
     template_fields = ("job_id",)
 
     @apply_defaults
-    def __init__(self, *, job_id: int, **kwargs) -> None:
+    def __init__(self, *, dataprep_conn_id: str = "dataprep_default", job_id: int, **kwargs) -> None:
         super().__init__(**kwargs)
+        self.dataprep_conn_id = (dataprep_conn_id,)
         self.job_id = job_id
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_default",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_group_id The ID of the job that will be requests
+    :type job_group_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects

Review comment:
       ```suggestion
       :param include_deleted: if set to "true", will include deleted objects
   ```

##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -35,20 +35,90 @@ class DataprepGetJobsForJobGroupOperator(BaseOperator):
         For more information on how to use this operator, take a look at the guide:
         :ref:`howto/operator:DataprepGetJobsForJobGroupOperator`
 
-
     :param job_id The ID of the job that will be requests
     :type job_id: int
     """
 
     template_fields = ("job_id",)
 
     @apply_defaults
-    def __init__(self, *, job_id: int, **kwargs) -> None:
+    def __init__(self, *, dataprep_conn_id: str = "dataprep_default", job_id: int, **kwargs) -> None:
         super().__init__(**kwargs)
+        self.dataprep_conn_id = (dataprep_conn_id,)
         self.job_id = job_id
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_default",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_group_id The ID of the job that will be requests
+    :type job_group_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response

Review comment:
       ```suggestion
       :param embed: Comma-separated list of objects to pull in as part of the response
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r478216262



##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -49,6 +48,68 @@ def __init__(self, *, job_id: int, **kwargs) -> None:
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_group_id The ID of the job that will be requests
+    :type job_group_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects
+    :type include_deleted: bool
+    """
+
+    template_fields = ("job_group_id",)
+
+    @apply_defaults
+    def __init__(
+        self, *, job_group_id: int, embed: str, include_deleted: bool, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_group_id = job_group_id
+        self.embed = embed
+        self.include_deleted = include_deleted
+
+    def execute(self, context: Dict):
+        self.log.info("Fetching data for job with id: %d ...", self.job_group_id)
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        response = hook.get_job_group(
+            job_group_id=self.job_group_id,
+            embed=self.embed,
+            include_deleted=self.include_deleted,
+        )
+        return response
+
+
+class DataprepRunJobGroupOperator(BaseOperator):
+    """
+    Create a ``jobGroup``, which launches the specified job as the authenticated user.
+    This performs the same action as clicking on the Run Job button in the application.
+    To get recipe_id please follow the Dataprep API documentation
+    https://clouddataprep.com/documentation/api#operation/runJobGroup
+
+    :param recipe_id: The identifier for the recipe you would like to run.
+    :type recipe_id: int
+    """
+

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477613123



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+import pytest
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+@pytest.mark.skipif(environ.get("DATAPREP_TOKEN") is None, 'Dataprep token not present')

Review comment:
       ```suggestion
   @pytest.mark.skipif(environ.get("DATAPREP_TOKEN") is None, reason='Dataprep token not present')
   ```
   According to the error I get when running this test...
   ```
   Error evaluating 'skipif': you need to specify reason=STRING when using booleans as conditions.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475511637



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -82,11 +83,11 @@ def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[
         A job group is a job that is executed from a specific node in a flow.
 
         :param job_id The ID of the job that will be fetched.
-        :type job_id: int
+        :type job_id: int.
         :param embed Comma-separated list of objects to pull in as part of the response.
-        :type embed: string
+        :type embed: string.

Review comment:
       My mistake. 

##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -82,11 +83,11 @@ def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[
         A job group is a job that is executed from a specific node in a flow.
 
         :param job_id The ID of the job that will be fetched.
-        :type job_id: int
+        :type job_id: int.
         :param embed Comma-separated list of objects to pull in as part of the response.
-        :type embed: string
+        :type embed: string.

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475530855



##########
File path: docs/howto/operator/google/cloud/dataprep.rst
##########
@@ -17,7 +17,28 @@
 
 Google Dataprep Operators
 =========================
-`Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+Dataprep is the intelligent cloud data service to visually explore, clean, and prepare data for analysis and machine learning.
+Service can be use to explore and transform raw data from disparate and/or large datasets into clean and structured data for further analysis and processing.
+Dataprep Job is an internal object encoding the information necessary to run a part of a Cloud Dataprep job group.
+For more information about the service visit `Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+
+Before using Dataprep within Airflow you need to authenticate your account with TOKEN.

Review comment:
       Can you add section header here? Ideally, it should be called "Before you begin".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477198132



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+class DataprepExampleDagsTest(GoogleSystemTest):

Review comment:
       I talked about it with Kamil yesterday and he explained me everything. 

##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -51,6 +50,68 @@ def __init__(
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_id The ID of the job that will be requests
+    :type job_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects
+    :type include_deleted: bool
+    """
+
+    template_fields = ("job_id",)
+
+    @apply_defaults
+    def __init__(
+        self, *, job_id: int, embed: str, include_deleted: bool, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_id = job_id
+        self.embed = embed
+        self.include_deleted = include_deleted
+
+    def execute(self, context: Dict):
+        self.log.info("Fetching data for job with id: %d ...", self.job_id)
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        response = hook.get_job_group(job_id=self.job_id, embed="", include_deleted=self.include_deleted)
+        return response
+
+
+class DataprepRunJobGroupOperator(BaseOperator):
+    """
+    Create a ``jobGroup``, which launches the specified job as the authenticated user.
+    This performs the same action as clicking on the Run Job button in the application.
+
+    :param recipe_id: to run a job, you just specify the recipe identifier (``wrangledDataset.id``).
+        If the job is successful, all defined outputs are generated,
+        as defined in the output object, publications, and ``writeSettings`` objects
+        associated with the recipe.
+        To identify the ``wrangledDataset Id``, select the recipe icon in the UI flow view and
+        take the id shown in the URL. e.g. if the URL is /flows/10?recipe=7,

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477608450



##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -49,6 +48,68 @@ def __init__(self, *, job_id: int, **kwargs) -> None:
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_group_id The ID of the job that will be requests
+    :type job_group_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects
+    :type include_deleted: bool
+    """
+
+    template_fields = ("job_group_id",)

Review comment:
       ```suggestion
       template_fields = ("job_group_id", "embed")
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475651232



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")

Review comment:
       What do you mean by that? This is system test, it will be skipped in CI.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475529841



##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -51,6 +50,68 @@ def __init__(
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_id The ID of the job that will be requests
+    :type job_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects
+    :type include_deleted: bool
+    """
+
+    template_fields = ("job_id",)
+
+    @apply_defaults
+    def __init__(
+        self, *, job_id: int, embed: str, include_deleted: bool, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_id = job_id
+        self.embed = embed
+        self.include_deleted = include_deleted
+
+    def execute(self, context: Dict):
+        self.log.info("Fetching data for job with id: %d ...", self.job_id)
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        response = hook.get_job_group(job_id=self.job_id, embed="", include_deleted=self.include_deleted)

Review comment:
       embed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475922760



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+class DataprepExampleDagsTest(GoogleSystemTest):

Review comment:
       But it's in google providers package so we should be able to run it using `--system=google`. Also (I am not 100% sure) this system test may require some GCP resources to be easy to repeat.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475926656



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+class DataprepExampleDagsTest(GoogleSystemTest):

Review comment:
       Good point, but we should still detect missing credentials.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r480968047



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -50,26 +55,63 @@ def _headers(self) -> Dict[str, str]:
         }
         return headers
 
-    @property
-    def _token(self) -> str:
-        conn = self.get_connection(self.dataprep_conn_id)
-        token = conn.extra_dejson.get("token")
-        if token is None:
-            raise AirflowException(
-                "Dataprep token is missing or has invalid format. "
-                "Please make sure that Dataprep token is added to the Airflow Connections."
-            )
-        return token
-
     @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
     def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched
         :type job_id: int
         """
-        url: str = f"{self._url}/{job_id}/jobs"
+
+        endpoint_path = f"v4/jobGroups/{job_id}/jobs"
+        url: str = os.path.join(self._base_url, endpoint_path)
         response = requests.get(url, headers=self._headers)
-        response.raise_for_status()
+        self._raise_for_status(response)
+        return response.json()
+
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
+    def get_job_group(self, job_group_id: int, embed: str, include_deleted: bool) -> Dict[str, Any]:
+        """
+        Get the specified job group.
+        A job group is a job that is executed from a specific node in a flow.
+
+        :param job_group_id The ID of the job that will be fetched
+        :type job_group_id: int
+        :param embed Comma-separated list of objects to pull in as part of the response
+        :type embed: str
+        :param include_deleted if set to "true", will include deleted objects

Review comment:
       ```suggestion
           :param include_deleted: if set to "true", will include deleted objects
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477223830



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -71,19 +62,20 @@ def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         :type job_id: int
         """
 
-        url: str = f"{self._url}/{job_id}/jobs"
+        endpoint_path = f"/v4/jobGroups/{job_id}/jobs"
+        url: str = f"{self._base_url}{endpoint_path}"

Review comment:
       ```suggestion
           endpoint_path = f"v4/jobGroups/{job_id}/jobs"
           url: str = os.path.join(self._base_url, endpoint_path)
   ```
   In this way we will handle `_base_url` with and without ending `/`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477224927



##########
File path: tests/conftest.py
##########
@@ -251,6 +253,9 @@ def pytest_configure(config):
     config.addinivalue_line(
         "markers", "airflow_2: mark tests that works only on Airflow 2.0 / master"
     )
+    config.addinivalue_line(
+        "markers", "credential_env(name): mark tests that require credential env in CREDENTIALS_ENV"
+    )

Review comment:
       I think it would be simple to add
   ```
   @unittest.skipIf(os.environ.get("DATAPREP_TOKEN") is None, 'Dataprep token not present')
   ```
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r478221088



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -40,7 +41,10 @@ class GoogleDataprepHook(BaseHook):
     def __init__(self, dataprep_conn_id: str = "dataprep_conn_id") -> None:
         super().__init__()
         self.dataprep_conn_id = dataprep_conn_id
-        self._url = "https://api.clouddataprep.com/v4/jobGroups"
+        conn = self.get_connection(self.dataprep_conn_id)
+        extra_dejson = conn.extra_dejson
+        self._token = extra_dejson.get("token")

Review comment:
       Yes. Thank you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r476229277



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -66,10 +67,57 @@ def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched
         :type job_id: int
         """
+
         url: str = f"{self._url}/{job_id}/jobs"
         response = requests.get(url, headers=self._headers)
         response.raise_for_status()
         return response.json()
+
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
+    def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[str, Any]:
+        """
+        Get the specified job group.
+        A job group is a job that is executed from a specific node in a flow.
+
+        :param job_id The ID of the job that will be fetched
+        :type job_id: int
+        :param embed Comma-separated list of objects to pull in as part of the response
+        :type embed: str
+        :param include_deleted if set to "true", will include deleted objects
+        :type include_deleted: bool
+        """
+
+        params: Dict[str, Any] = {
+            'embed': embed,
+            'includeDeleted': include_deleted
+        }
+
+        url: str = f"{self._url}/{job_id}"

Review comment:
       Discussed via slack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477608267



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -50,26 +54,60 @@ def _headers(self) -> Dict[str, str]:
         }
         return headers
 
-    @property
-    def _token(self) -> str:
-        conn = self.get_connection(self.dataprep_conn_id)
-        token = conn.extra_dejson.get("token")
-        if token is None:
-            raise AirflowException(
-                "Dataprep token is missing or has invalid format. "
-                "Please make sure that Dataprep token is added to the Airflow Connections."
-            )
-        return token
-
     @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
     def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched
         :type job_id: int
         """
-        url: str = f"{self._url}/{job_id}/jobs"
+
+        endpoint_path = f"/v4/jobGroups/{job_id}/jobs"
+        url: str = os.path.join(self._base_url, endpoint_path)
         response = requests.get(url, headers=self._headers)
         response.raise_for_status()

Review comment:
       I remember there was a problem with error message propagation - only code was returned instead of error message. Is this still an issue?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475390679



##########
File path: docs/howto/operator/google/cloud/dataprep.rst
##########
@@ -17,7 +17,25 @@
 
 Google Dataprep Operators
 =========================
-`Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+Dataprep is the intelligent cloud data service to visually explore, clean, and prepare data for analysis and machine learning.
+Service can be use to explore and transform raw data from disparate and/or large datasets into clean and structured data for further analysis and processing.
+Dataprep Job is an internal object encoding the information necessary to run a part of a Cloud Dataprep job group.
+For more information about the service visit `Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+
+Before using Dataprep within Airflow you need to authenticate your account with TOKEN.
+To get connection Dataprep with Airflow you need Dataprep token. Please follow Dataprep instructions to do it.
+https://clouddataprep.com/documentation/api#section/Authentication
+
+TOKEN should be added to the Connection in Airflow in JSON format.
+Her you can check how to do such connection:
+https://airflow.readthedocs.io/en/stable/howto/connection/index.html#editing-a-connection-with-the-ui
+
+See following example:
+Set values for these fields:
+.. code-block::
+
+  Conn Id: "your_conn_id"
+  Extra: {"token": "TOKEN"}

Review comment:
       Ok.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477608592



##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -49,6 +48,68 @@ def __init__(self, *, job_id: int, **kwargs) -> None:
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_group_id The ID of the job that will be requests
+    :type job_group_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects
+    :type include_deleted: bool
+    """
+
+    template_fields = ("job_group_id",)
+
+    @apply_defaults
+    def __init__(
+        self, *, job_group_id: int, embed: str, include_deleted: bool, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_group_id = job_group_id
+        self.embed = embed
+        self.include_deleted = include_deleted
+
+    def execute(self, context: Dict):
+        self.log.info("Fetching data for job with id: %d ...", self.job_group_id)
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        response = hook.get_job_group(
+            job_group_id=self.job_group_id,
+            embed=self.embed,
+            include_deleted=self.include_deleted,
+        )
+        return response
+
+
+class DataprepRunJobGroupOperator(BaseOperator):
+    """
+    Create a ``jobGroup``, which launches the specified job as the authenticated user.
+    This performs the same action as clicking on the Run Job button in the application.
+    To get recipe_id please follow the Dataprep API documentation
+    https://clouddataprep.com/documentation/api#operation/runJobGroup
+
+    :param recipe_id: The identifier for the recipe you would like to run.
+    :type recipe_id: int
+    """
+

Review comment:
       ```suggestion
       template_fields = ("body_request",)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475681110



##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -51,6 +50,68 @@ def __init__(
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_id The ID of the job that will be requests
+    :type job_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects
+    :type include_deleted: bool
+    """
+
+    template_fields = ("job_id",)
+
+    @apply_defaults
+    def __init__(
+        self, *, job_id: int, embed: str, include_deleted: bool, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_id = job_id
+        self.embed = embed
+        self.include_deleted = include_deleted
+
+    def execute(self, context: Dict):
+        self.log.info("Fetching data for job with id: %d ...", self.job_id)
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        response = hook.get_job_group(job_id=self.job_id, embed="", include_deleted=self.include_deleted)

Review comment:
       ```suggestion
           response = hook.get_job_group(job_id=self.job_id, embed=self.embed, include_deleted=self.include_deleted)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r476228605



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER
+from tests.test_utils.system_tests_class import SystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+class DataprepExampleDagsTest(SystemTest):

Review comment:
       I think it is resolved. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r476321745



##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -51,6 +50,68 @@ def __init__(
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_id The ID of the job that will be requests
+    :type job_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects
+    :type include_deleted: bool
+    """
+
+    template_fields = ("job_id",)
+
+    @apply_defaults
+    def __init__(
+        self, *, job_id: int, embed: str, include_deleted: bool, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_id = job_id
+        self.embed = embed
+        self.include_deleted = include_deleted
+
+    def execute(self, context: Dict):
+        self.log.info("Fetching data for job with id: %d ...", self.job_id)
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        response = hook.get_job_group(job_id=self.job_id, embed="", include_deleted=self.include_deleted)

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r472909294



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER
+from tests.test_utils.system_tests_class import SystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+class DataprepExampleDagsTest(SystemTest):

Review comment:
       Assuming that we have clean Dataprep, don't we need to create some pipelines or something? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475671572



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -66,10 +67,57 @@ def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched
         :type job_id: int
         """
+
         url: str = f"{self._url}/{job_id}/jobs"
         response = requests.get(url, headers=self._headers)
         response.raise_for_status()
         return response.json()
+
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
+    def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[str, Any]:
+        """
+        Get the specified job group.
+        A job group is a job that is executed from a specific node in a flow.
+
+        :param job_id The ID of the job that will be fetched
+        :type job_id: int
+        :param embed Comma-separated list of objects to pull in as part of the response
+        :type embed: str
+        :param include_deleted if set to "true", will include deleted objects
+        :type include_deleted: bool
+        """
+
+        params: Dict[str, Any] = {
+            'embed': embed,
+            'includeDeleted': include_deleted
+        }
+
+        url: str = f"{self._url}/{job_id}"

Review comment:
       Do you mean property @mik-laj?
   Like this?
   
   ```
   @property
       def _base_url(self) -> str:
           conn = self.get_connection(self.dataprep_conn_id)
           url = conn.extra_dejson.get("base_url", "https://api.clouddataprep.com/")
           if url is None:
               raise AirflowException(
                   "Dataprep base url is missing or has invalid format. "
                   "Please make sure that Dataprep url is added to the Airflow Connections."
               )
           return url
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r478222323



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+import pytest
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}

Review comment:
       Thank you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r480969304



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -50,26 +55,63 @@ def _headers(self) -> Dict[str, str]:
         }
         return headers
 
-    @property
-    def _token(self) -> str:
-        conn = self.get_connection(self.dataprep_conn_id)
-        token = conn.extra_dejson.get("token")
-        if token is None:
-            raise AirflowException(
-                "Dataprep token is missing or has invalid format. "
-                "Please make sure that Dataprep token is added to the Airflow Connections."
-            )
-        return token
-
     @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
     def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched
         :type job_id: int
         """
-        url: str = f"{self._url}/{job_id}/jobs"
+
+        endpoint_path = f"v4/jobGroups/{job_id}/jobs"
+        url: str = os.path.join(self._base_url, endpoint_path)
         response = requests.get(url, headers=self._headers)
-        response.raise_for_status()
+        self._raise_for_status(response)
+        return response.json()
+
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
+    def get_job_group(self, job_group_id: int, embed: str, include_deleted: bool) -> Dict[str, Any]:
+        """
+        Get the specified job group.
+        A job group is a job that is executed from a specific node in a flow.
+
+        :param job_group_id The ID of the job that will be fetched
+        :type job_group_id: int
+        :param embed Comma-separated list of objects to pull in as part of the response
+        :type embed: str
+        :param include_deleted if set to "true", will include deleted objects
+        :type include_deleted: bool
+        """
+
+        params: Dict[str, Any] = {"embed": embed, "includeDeleted": include_deleted}
+        endpoint_path = f"v4/jobGroups/{job_group_id}"
+        url: str = os.path.join(self._base_url, endpoint_path)
+        response = requests.get(url, headers=self._headers, params=params)
+        self._raise_for_status(response)
+        return response.json()
+
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
+    def run_job_group(self, body_request: dict) -> Dict[str, Any]:
+        """
+        Creates a ``jobGroup``, which launches the specified job as the authenticated user.
+        This performs the same action as clicking on the Run Job button in the application.
+        To get recipe_id please follow the Dataprep API documentation
+        https://clouddataprep.com/documentation/api#operation/runJobGroup
+
+        :param body_request: The identifier for the recipe you would like to run.
+        :type body_request: dict
+        """
+
+        endpoint_path = "v4/jobGroups"
+        url: str = os.path.join(self._base_url, endpoint_path)
+        response = requests.post(url, headers=self._headers, data=json.dumps(body_request))
+        self._raise_for_status(response)
         return response.json()
+
+    def _raise_for_status(self, response):

Review comment:
       ```suggestion
       def _raise_for_status(self, response: requests.models.Response) -> None:
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r478211713



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -40,7 +41,10 @@ class GoogleDataprepHook(BaseHook):
     def __init__(self, dataprep_conn_id: str = "dataprep_conn_id") -> None:
         super().__init__()
         self.dataprep_conn_id = dataprep_conn_id
-        self._url = "https://api.clouddataprep.com/v4/jobGroups"
+        conn = self.get_connection(self.dataprep_conn_id)
+        extra_dejson = conn.extra_dejson
+        self._token = extra_dejson.get("token")
+        self._base_url = extra_dejson.get("base_url", "https://api.clouddataprep.com")

Review comment:
       Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477608830



##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -49,6 +48,68 @@ def __init__(self, *, job_id: int, **kwargs) -> None:
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_group_id The ID of the job that will be requests
+    :type job_group_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects
+    :type include_deleted: bool
+    """
+
+    template_fields = ("job_group_id",)
+
+    @apply_defaults
+    def __init__(
+        self, *, job_group_id: int, embed: str, include_deleted: bool, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_group_id = job_group_id
+        self.embed = embed
+        self.include_deleted = include_deleted
+
+    def execute(self, context: Dict):
+        self.log.info("Fetching data for job with id: %d ...", self.job_group_id)
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        response = hook.get_job_group(
+            job_group_id=self.job_group_id,
+            embed=self.embed,
+            include_deleted=self.include_deleted,
+        )
+        return response
+
+
+class DataprepRunJobGroupOperator(BaseOperator):
+    """
+    Create a ``jobGroup``, which launches the specified job as the authenticated user.
+    This performs the same action as clicking on the Run Job button in the application.
+    To get recipe_id please follow the Dataprep API documentation
+    https://clouddataprep.com/documentation/api#operation/runJobGroup
+
+    :param recipe_id: The identifier for the recipe you would like to run.
+    :type recipe_id: int
+    """
+
+    def __init__(self, *, body_request: dict, **kwargs) -> None:
+        super().__init__(**kwargs)
+        self.body_request = body_request
+
+    def execute(self, context: None):
+        self.log.info("Creating a job...")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")

Review comment:
       The connection should be configurable via operator interface

##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -49,6 +48,68 @@ def __init__(self, *, job_id: int, **kwargs) -> None:
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_group_id The ID of the job that will be requests
+    :type job_group_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects
+    :type include_deleted: bool
+    """
+
+    template_fields = ("job_group_id",)
+
+    @apply_defaults
+    def __init__(
+        self, *, job_group_id: int, embed: str, include_deleted: bool, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_group_id = job_group_id
+        self.embed = embed
+        self.include_deleted = include_deleted
+
+    def execute(self, context: Dict):
+        self.log.info("Fetching data for job with id: %d ...", self.job_group_id)
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")

Review comment:
       The connection should be configurable via operator interface

##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -49,6 +48,68 @@ def __init__(self, *, job_id: int, **kwargs) -> None:
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)

Review comment:
       The connection should be configurable via operator interface




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r478219795



##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -49,6 +48,68 @@ def __init__(self, *, job_id: int, **kwargs) -> None:
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_group_id The ID of the job that will be requests
+    :type job_group_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects
+    :type include_deleted: bool
+    """
+
+    template_fields = ("job_group_id",)
+
+    @apply_defaults
+    def __init__(
+        self, *, job_group_id: int, embed: str, include_deleted: bool, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_group_id = job_group_id
+        self.embed = embed
+        self.include_deleted = include_deleted
+
+    def execute(self, context: Dict):
+        self.log.info("Fetching data for job with id: %d ...", self.job_group_id)
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        response = hook.get_job_group(
+            job_group_id=self.job_group_id,
+            embed=self.embed,
+            include_deleted=self.include_deleted,
+        )
+        return response
+
+
+class DataprepRunJobGroupOperator(BaseOperator):
+    """
+    Create a ``jobGroup``, which launches the specified job as the authenticated user.
+    This performs the same action as clicking on the Run Job button in the application.
+    To get recipe_id please follow the Dataprep API documentation
+    https://clouddataprep.com/documentation/api#operation/runJobGroup
+
+    :param recipe_id: The identifier for the recipe you would like to run.
+    :type recipe_id: int
+    """
+
+    def __init__(self, *, body_request: dict, **kwargs) -> None:
+        super().__init__(**kwargs)
+        self.body_request = body_request
+
+    def execute(self, context: None):
+        self.log.info("Creating a job...")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")

Review comment:
       Yes, indeed. Thanks.

##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -49,6 +48,68 @@ def __init__(self, *, job_id: int, **kwargs) -> None:
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_group_id The ID of the job that will be requests
+    :type job_group_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects
+    :type include_deleted: bool
+    """
+
+    template_fields = ("job_group_id",)
+
+    @apply_defaults
+    def __init__(
+        self, *, job_group_id: int, embed: str, include_deleted: bool, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_group_id = job_group_id
+        self.embed = embed
+        self.include_deleted = include_deleted
+
+    def execute(self, context: Dict):
+        self.log.info("Fetching data for job with id: %d ...", self.job_group_id)
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")

Review comment:
       Yes, indeed. Thanks.

##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -49,6 +48,68 @@ def __init__(self, *, job_id: int, **kwargs) -> None:
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)

Review comment:
       Yes, indeed. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r476229021



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -98,3 +99,24 @@ def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[
         response = requests.get(url, headers=self._headers, params=params)
         response.raise_for_status()
         return response.json()
+
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
+    def run_job_group(self, recipe_id: int) -> Dict[str, Any]:
+        """
+        Creates a jobGroup, which launches the specified job as the authenticated user.
+        This performs the same action as clicking on the Run Job button in the application.
+
+        :param recipe_id: o run a job, you just specify the recipe identifier (``wrangledDataset.id``).
+        If the job is successful, all defined outputs are generated,
+        as defined in the output object, publications, and writeSettings objects associated with the recipe.
+        To identify the ``wrangledDataset Id``, select the recipe icon in the UI flow view and
+        take the id shown in the URL, e.g. if the URL is /flows/10?recipe=7, the ``wrangledDataset`` Id is 7.
+        :type recipe_id: int.
+        """
+
+        recipe = {"wrangledDataset": {"id": recipe_id}}

Review comment:
       It's done and working. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477609682



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -40,7 +41,10 @@ class GoogleDataprepHook(BaseHook):
     def __init__(self, dataprep_conn_id: str = "dataprep_conn_id") -> None:
         super().__init__()
         self.dataprep_conn_id = dataprep_conn_id
-        self._url = "https://api.clouddataprep.com/v4/jobGroups"
+        conn = self.get_connection(self.dataprep_conn_id)
+        extra_dejson = conn.extra_dejson
+        self._token = extra_dejson.get("token")
+        self._base_url = extra_dejson.get("base_url", "https://api.clouddataprep.com")

Review comment:
       ```suggestion
           self._base_url = extra_dejson.get("extra__dataprep__base_url", "https://api.clouddataprep.com")
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r476242621



##########
File path: docs/howto/operator/google/cloud/dataprep.rst
##########
@@ -17,7 +17,28 @@
 
 Google Dataprep Operators
 =========================
-`Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+Dataprep is the intelligent cloud data service to visually explore, clean, and prepare data for analysis and machine learning.
+Service can be use to explore and transform raw data from disparate and/or large datasets into clean and structured data for further analysis and processing.
+Dataprep Job is an internal object encoding the information necessary to run a part of a Cloud Dataprep job group.
+For more information about the service visit `Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+
+Before using Dataprep within Airflow you need to authenticate your account with TOKEN.
+To get connection Dataprep with Airflow you need Dataprep token. Please follow Dataprep instructions to do it.
+https://clouddataprep.com/documentation/api#section/Authentication

Review comment:
       Ok. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475529435



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -66,10 +67,57 @@ def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched
         :type job_id: int
         """
+
         url: str = f"{self._url}/{job_id}/jobs"
         response = requests.get(url, headers=self._headers)
         response.raise_for_status()
         return response.json()
+
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
+    def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[str, Any]:
+        """
+        Get the specified job group.
+        A job group is a job that is executed from a specific node in a flow.
+
+        :param job_id The ID of the job that will be fetched
+        :type job_id: int
+        :param embed Comma-separated list of objects to pull in as part of the response
+        :type embed: str
+        :param include_deleted if set to "true", will include deleted objects
+        :type include_deleted: bool
+        """
+
+        params: Dict[str, Any] = {
+            'embed': embed,
+            'includeDeleted': include_deleted
+        }
+
+        url: str = f"{self._url}/{job_id}"

Review comment:
       You should define these variables with base_url, which should be configurable and default to ``https://api.clouddataprep.com/`` and endpoint name. - In this cases ``v4/jobGroups``
   ```
           sellf._base_url = self.conn.extra_dejson.get('base_url', 'https://api.clouddataprep.com/')
   ```
   ```
          endpoint_path = '/v4/jobGroups/{job_groupp__id}'
           url: str = f"{self._url}/{endpoint_path}"
   ```
   Otherwise, it's very confusing as it's hard to figure out which endpoint you are using.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475531595



##########
File path: docs/howto/operator/google/cloud/dataprep.rst
##########
@@ -17,7 +17,28 @@
 
 Google Dataprep Operators
 =========================
-`Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+Dataprep is the intelligent cloud data service to visually explore, clean, and prepare data for analysis and machine learning.
+Service can be use to explore and transform raw data from disparate and/or large datasets into clean and structured data for further analysis and processing.
+Dataprep Job is an internal object encoding the information necessary to run a part of a Cloud Dataprep job group.
+For more information about the service visit `Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+
+Before using Dataprep within Airflow you need to authenticate your account with TOKEN.
+To get connection Dataprep with Airflow you need Dataprep token. Please follow Dataprep instructions to do it.
+https://clouddataprep.com/documentation/api#section/Authentication
+
+TOKEN should be added to the Connection in Airflow in JSON format.
+Her you can check how to do such connection:
+https://airflow.readthedocs.io/en/stable/howto/connection/index.html#editing-a-connection-with-the-ui

Review comment:
       Please use meaningful link text.
   https://developers.google.com/style/cross-references




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477607766



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -40,7 +41,10 @@ class GoogleDataprepHook(BaseHook):
     def __init__(self, dataprep_conn_id: str = "dataprep_conn_id") -> None:
         super().__init__()
         self.dataprep_conn_id = dataprep_conn_id
-        self._url = "https://api.clouddataprep.com/v4/jobGroups"
+        conn = self.get_connection(self.dataprep_conn_id)
+        extra_dejson = conn.extra_dejson
+        self._token = extra_dejson.get("token")
+        self._base_url = extra_dejson.get("base_url", "https://api.clouddataprep.com")

Review comment:
       Nice!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477259975



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -71,19 +62,20 @@ def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         :type job_id: int
         """
 
-        url: str = f"{self._url}/{job_id}/jobs"
+        endpoint_path = f"/v4/jobGroups/{job_id}/jobs"
+        url: str = f"{self._base_url}{endpoint_path}"

Review comment:
       Nice. 
   I like it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#issuecomment-675314760


   @michalslowikowski00 let me know once you are done. Btw. do the system tests work?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r480968203



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -50,26 +55,63 @@ def _headers(self) -> Dict[str, str]:
         }
         return headers
 
-    @property
-    def _token(self) -> str:
-        conn = self.get_connection(self.dataprep_conn_id)
-        token = conn.extra_dejson.get("token")
-        if token is None:
-            raise AirflowException(
-                "Dataprep token is missing or has invalid format. "
-                "Please make sure that Dataprep token is added to the Airflow Connections."
-            )
-        return token
-
     @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
     def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched

Review comment:
       ```suggestion
           :param job_id: The ID of the job that will be fetched
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 edited a comment on pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 edited a comment on pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#issuecomment-675322492


   @turbaszek 
   
   I didn't check the system tests. I will do it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477627256



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+import pytest
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+@pytest.mark.skipif(environ.get("DATAPREP_TOKEN") is None, 'Dataprep token not present')

Review comment:
       Also when running the system test I got the following error:
   ```
   ERROR    airflow.providers.google.cloud.hooks.dataprep.GoogleDataprepHook:dataprep.py:62 {'name': 'DataNotFoundException', 'message': 'Requested data was not found', 'details': 'This dataset could not be found.'}
   ERROR    airflow.models.taskinstance.TaskInstance:taskinstance.py:1310 404 Client Error: Not Found for url: https://api.clouddataprep.com/v4/jobGroups
   ```
   Does this system test requires developer to configure something more than a Dataprep token?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477275269



##########
File path: tests/conftest.py
##########
@@ -251,6 +253,9 @@ def pytest_configure(config):
     config.addinivalue_line(
         "markers", "airflow_2: mark tests that works only on Airflow 2.0 / master"
     )
+    config.addinivalue_line(
+        "markers", "credential_env(name): mark tests that require credential env in CREDENTIALS_ENV"
+    )

Review comment:
       It's simpler. Indeed. 
   Do you think that my solution in over engineering? I like that solution because of explicit message for the user and it's reusable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r474802652



##########
File path: docs/howto/operator/google/cloud/dataprep.rst
##########
@@ -17,7 +17,25 @@
 
 Google Dataprep Operators
 =========================
-`Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+Dataprep is the intelligent cloud data service to visually explore, clean, and prepare data for analysis and machine learning.
+Service can be use to explore and transform raw data from disparate and/or large datasets into clean and structured data for further analysis and processing.
+Dataprep Job is an internal object encoding the information necessary to run a part of a Cloud Dataprep job group.
+For more information about the service visit `Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+
+Before using Dataprep within Airflow you need to authenticate your account with TOKEN.
+To get connection Dataprep with Airflow you need Dataprep token. Please follow Dataprep instructions to do it.
+https://clouddataprep.com/documentation/api#section/Authentication
+
+TOKEN should be added to the Connection in Airflow in JSON format.
+Her you can check how to do such connection:
+https://airflow.readthedocs.io/en/stable/howto/connection/index.html#editing-a-connection-with-the-ui
+
+See following example:
+Set values for these fields:
+.. code-block::
+
+  Conn Id: "your_conn_id"
+  Extra: {"token": "TOKEN"}

Review comment:
       We should change this to
   ```suggestion
     Extra: {"extra__dataprep__token": "TOKEN"}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r476577587



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+class DataprepExampleDagsTest(GoogleSystemTest):

Review comment:
       The test will fail if there's no real token. Developers should be aware of that. You should check if the system variable is set. If not then skip the test with useful message 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r476319915



##########
File path: docs/howto/operator/google/cloud/dataprep.rst
##########
@@ -17,7 +17,28 @@
 
 Google Dataprep Operators
 =========================
-`Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+Dataprep is the intelligent cloud data service to visually explore, clean, and prepare data for analysis and machine learning.
+Service can be use to explore and transform raw data from disparate and/or large datasets into clean and structured data for further analysis and processing.
+Dataprep Job is an internal object encoding the information necessary to run a part of a Cloud Dataprep job group.
+For more information about the service visit `Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+
+Before using Dataprep within Airflow you need to authenticate your account with TOKEN.

Review comment:
       Yes. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek merged pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek merged pull request #10304:
URL: https://github.com/apache/airflow/pull/10304


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475511095



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -98,3 +99,24 @@ def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[
         response = requests.get(url, headers=self._headers, params=params)
         response.raise_for_status()
         return response.json()
+
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
+    def run_job_group(self, recipe_id: int) -> Dict[str, Any]:
+        """
+        Creates a jobGroup, which launches the specified job as the authenticated user.
+        This performs the same action as clicking on the Run Job button in the application.
+
+        :param recipe_id: o run a job, you just specify the recipe identifier (``wrangledDataset.id``).
+        If the job is successful, all defined outputs are generated,
+        as defined in the output object, publications, and writeSettings objects associated with the recipe.
+        To identify the ``wrangledDataset Id``, select the recipe icon in the UI flow view and
+        take the id shown in the URL, e.g. if the URL is /flows/10?recipe=7, the ``wrangledDataset`` Id is 7.
+        :type recipe_id: int.
+        """
+
+        recipe = {"wrangledDataset": {"id": recipe_id}}

Review comment:
       Yes, I am aware of that and will try to use this example and run DAG with this params. I am still confused when I trying to use Dataprep.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475506331



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -82,11 +83,11 @@ def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[
         A job group is a job that is executed from a specific node in a flow.
 
         :param job_id The ID of the job that will be fetched.
-        :type job_id: int
+        :type job_id: int.
         :param embed Comma-separated list of objects to pull in as part of the response.
-        :type embed: string
+        :type embed: string.
         :param include_deleted if set to "true", will include deleted objects.
-        :type include_deleted: bool
+        :type include_deleted: bool.

Review comment:
       I saw documentation with full stops and I was wondering if I should add full stops in mine.
   Ok, I will remove full stops.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r476347117



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+class DataprepExampleDagsTest(GoogleSystemTest):

Review comment:
       I do not understand what is the problem here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r472958287



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER
+from tests.test_utils.system_tests_class import SystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+class DataprepExampleDagsTest(SystemTest):

Review comment:
       Ok, so I will add new operator in this PR. Right away, sir. :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r472953588



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER
+from tests.test_utils.system_tests_class import SystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+class DataprepExampleDagsTest(SystemTest):

Review comment:
       This ops based on my one pipeline. Next op that I am gone write will create a pipeline. 
   This is the end point -- https://clouddataprep.com/documentation/api#operation/runJobGroup and then I think creating pipeline could be as a step in `setUp()`. Also I can wait with this PR until I finish new PR with creating pipline. WDYT?
   After then the system test will be complete. WDYT? 
   Or add system test after all PRs. :)
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#issuecomment-675322492


   > @michalslowikowski00 let me know once you are done. Btw. do the system tests work?
   
   I didn't check the system tests. I will do it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475461704



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -82,11 +83,11 @@ def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[
         A job group is a job that is executed from a specific node in a flow.
 
         :param job_id The ID of the job that will be fetched.
-        :type job_id: int
+        :type job_id: int.

Review comment:
       ```suggestion
           :type job_id: int
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475531399



##########
File path: docs/howto/operator/google/cloud/dataprep.rst
##########
@@ -17,7 +17,28 @@
 
 Google Dataprep Operators
 =========================
-`Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+Dataprep is the intelligent cloud data service to visually explore, clean, and prepare data for analysis and machine learning.
+Service can be use to explore and transform raw data from disparate and/or large datasets into clean and structured data for further analysis and processing.
+Dataprep Job is an internal object encoding the information necessary to run a part of a Cloud Dataprep job group.
+For more information about the service visit `Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+
+Before using Dataprep within Airflow you need to authenticate your account with TOKEN.
+To get connection Dataprep with Airflow you need Dataprep token. Please follow Dataprep instructions to do it.
+https://clouddataprep.com/documentation/api#section/Authentication

Review comment:
       Please use meaningful link text.
   https://developers.google.com/style/cross-references




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475509378



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -82,11 +83,11 @@ def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[
         A job group is a job that is executed from a specific node in a flow.
 
         :param job_id The ID of the job that will be fetched.
-        :type job_id: int
+        :type job_id: int.
         :param embed Comma-separated list of objects to pull in as part of the response.
-        :type embed: string
+        :type embed: string.
         :param include_deleted if set to "true", will include deleted objects.
-        :type include_deleted: bool
+        :type include_deleted: bool.

Review comment:
       Looking for `:type .*: .* \.$` in `airflow/providers` returns no records




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475461014



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -98,3 +99,24 @@ def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[
         response = requests.get(url, headers=self._headers, params=params)
         response.raise_for_status()
         return response.json()
+
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
+    def run_job_group(self, recipe_id: int) -> Dict[str, Any]:
+        """
+        Creates a jobGroup, which launches the specified job as the authenticated user.
+        This performs the same action as clicking on the Run Job button in the application.
+
+        :param recipe_id: o run a job, you just specify the recipe identifier (``wrangledDataset.id``).
+        If the job is successful, all defined outputs are generated,
+        as defined in the output object, publications, and writeSettings objects associated with the recipe.
+        To identify the ``wrangledDataset Id``, select the recipe icon in the UI flow view and
+        take the id shown in the URL, e.g. if the URL is /flows/10?recipe=7, the ``wrangledDataset`` Id is 7.
+        :type recipe_id: int.
+        """
+
+        recipe = {"wrangledDataset": {"id": recipe_id}}

Review comment:
       I had the impression that this payload can be much more complicated. At least that what I think looking at the documentation for example:
   ```
   {
     "wrangledDataset": {"id": 1},
     "overrides": {
       "execution": "spark",
       "profiler": false,
       "writesettings": [
         {
           "path": "<path_to_output_file>",
           "action": "create",
           "format": "csv",
           "compression": "none",
           "header": false,
           "asSingleFile": false
         }
       ]
     }
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475530514



##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -51,6 +50,68 @@ def __init__(
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_id The ID of the job that will be requests
+    :type job_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects
+    :type include_deleted: bool
+    """
+
+    template_fields = ("job_id",)
+
+    @apply_defaults
+    def __init__(
+        self, *, job_id: int, embed: str, include_deleted: bool, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_id = job_id
+        self.embed = embed
+        self.include_deleted = include_deleted
+
+    def execute(self, context: Dict):
+        self.log.info("Fetching data for job with id: %d ...", self.job_id)
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        response = hook.get_job_group(job_id=self.job_id, embed="", include_deleted=self.include_deleted)
+        return response
+
+
+class DataprepRunJobGroupOperator(BaseOperator):
+    """
+    Create a ``jobGroup``, which launches the specified job as the authenticated user.
+    This performs the same action as clicking on the Run Job button in the application.
+
+    :param recipe_id: to run a job, you just specify the recipe identifier (``wrangledDataset.id``).
+        If the job is successful, all defined outputs are generated,
+        as defined in the output object, publications, and ``writeSettings`` objects
+        associated with the recipe.
+        To identify the ``wrangledDataset Id``, select the recipe icon in the UI flow view and
+        take the id shown in the URL. e.g. if the URL is /flows/10?recipe=7,

Review comment:
       The API for the operator should not mention endpoint names in parameter descriptions. We should use operator/hook method names instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475673388



##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -51,6 +50,68 @@ def __init__(
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_id The ID of the job that will be requests
+    :type job_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects
+    :type include_deleted: bool
+    """
+
+    template_fields = ("job_id",)
+
+    @apply_defaults
+    def __init__(
+        self, *, job_id: int, embed: str, include_deleted: bool, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_id = job_id
+        self.embed = embed
+        self.include_deleted = include_deleted
+
+    def execute(self, context: Dict):
+        self.log.info("Fetching data for job with id: %d ...", self.job_id)
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        response = hook.get_job_group(job_id=self.job_id, embed="", include_deleted=self.include_deleted)

Review comment:
       Yes. Just like in the documentation.
   
   `embed | string  
   Example:  embed=association.otherAssociation,anotherAssociationComma-separated list of objects to pull in as part of the response. See Embedding Resources for more information.
   `
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475511095



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -98,3 +99,24 @@ def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[
         response = requests.get(url, headers=self._headers, params=params)
         response.raise_for_status()
         return response.json()
+
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
+    def run_job_group(self, recipe_id: int) -> Dict[str, Any]:
+        """
+        Creates a jobGroup, which launches the specified job as the authenticated user.
+        This performs the same action as clicking on the Run Job button in the application.
+
+        :param recipe_id: o run a job, you just specify the recipe identifier (``wrangledDataset.id``).
+        If the job is successful, all defined outputs are generated,
+        as defined in the output object, publications, and writeSettings objects associated with the recipe.
+        To identify the ``wrangledDataset Id``, select the recipe icon in the UI flow view and
+        take the id shown in the URL, e.g. if the URL is /flows/10?recipe=7, the ``wrangledDataset`` Id is 7.
+        :type recipe_id: int.
+        """
+
+        recipe = {"wrangledDataset": {"id": recipe_id}}

Review comment:
       Yes, I am aware of that and will try to use this example and run DAG with this params. I am still confused when I trying ti use Dataprep.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475922760



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}
+
+
+class DataprepExampleDagsTest(GoogleSystemTest):

Review comment:
       But it's in google providers package so we should be able to run it using `--system=google`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475525556



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -56,7 +57,7 @@ def _token(self) -> str:
         token = conn.extra_dejson.get("token")
         if token is None:
             raise AirflowException(
-                "Dataprep token is missing or has invalid format. "
+                "Dataprep token is missing or has invalid format."

Review comment:
       I think space is expected here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r478231596



##########
File path: docs/howto/operator/google/cloud/dataprep.rst
##########
@@ -17,7 +17,29 @@
 
 Google Dataprep Operators
 =========================
-`Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+Dataprep is the intelligent cloud data service to visually explore, clean, and prepare data for analysis and machine learning.
+Service can be use to explore and transform raw data from disparate and/or large datasets into clean and structured data for further analysis and processing.
+Dataprep Job is an internal object encoding the information necessary to run a part of a Cloud Dataprep job group.
+For more information about the service visit `Google Dataprep API documentation <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`_
+
+Before you begin
+^^^^^^^^^^^^^^^^
+Before using Dataprep within Airflow you need to authenticate your account with TOKEN.
+To get connection Dataprep with Airflow you need Dataprep token. Please follow Dataprep `instructions <https://clouddataprep.com/documentation/api#section/Authentication>`_ to do it.
+
+TOKEN should be added to the Connection in Airflow in JSON format.
+You can check `how to do such connection <https://airflow.readthedocs.io/en/stable/howto/connection/index.html#editing-a-connection-with-the-ui>`_.
+
+The DataprepRunJobGroupOperator will run specified job. Operator required a recipe id. To identify the recipe id please use `API documentation for runJobGroup <https://clouddataprep.com/documentation/api#operation/runJobGroup>`_
+E.g. if the URL is /flows/10?recipe=7, the recipe id is 7. The recipe cannot be created via this operator. It can be created only via UI which is available `here <https://clouddataprep.com/>`_.
+Some of parameters can be override by DAG's body request. How to do it is shown in example dag.
+
+See following example:
+Set values for these fields:
+.. code-block::
+
+  Conn Id: "your_conn_id"
+  Extra: {"extra__dataprep__token": "TOKEN"}

Review comment:
       Yes, thank you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r478221495



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -40,7 +41,10 @@ class GoogleDataprepHook(BaseHook):
     def __init__(self, dataprep_conn_id: str = "dataprep_conn_id") -> None:
         super().__init__()
         self.dataprep_conn_id = dataprep_conn_id
-        self._url = "https://api.clouddataprep.com/v4/jobGroups"
+        conn = self.get_connection(self.dataprep_conn_id)
+        extra_dejson = conn.extra_dejson
+        self._token = extra_dejson.get("token")
+        self._base_url = extra_dejson.get("base_url", "https://api.clouddataprep.com")

Review comment:
       Thank you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475462023



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -82,11 +83,11 @@ def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[
         A job group is a job that is executed from a specific node in a flow.
 
         :param job_id The ID of the job that will be fetched.
-        :type job_id: int
+        :type job_id: int.
         :param embed Comma-separated list of objects to pull in as part of the response.
-        :type embed: string
+        :type embed: string.

Review comment:
       ```suggestion
           :type embed: str
   ```
   There's no `string` type 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r478494815



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -50,26 +54,60 @@ def _headers(self) -> Dict[str, str]:
         }
         return headers
 
-    @property
-    def _token(self) -> str:
-        conn = self.get_connection(self.dataprep_conn_id)
-        token = conn.extra_dejson.get("token")
-        if token is None:
-            raise AirflowException(
-                "Dataprep token is missing or has invalid format. "
-                "Please make sure that Dataprep token is added to the Airflow Connections."
-            )
-        return token
-
     @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
     def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched
         :type job_id: int
         """
-        url: str = f"{self._url}/{job_id}/jobs"
+
+        endpoint_path = f"/v4/jobGroups/{job_id}/jobs"
+        url: str = os.path.join(self._base_url, endpoint_path)
         response = requests.get(url, headers=self._headers)
         response.raise_for_status()

Review comment:
       That's great. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r480967140



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -50,26 +55,63 @@ def _headers(self) -> Dict[str, str]:
         }
         return headers
 
-    @property
-    def _token(self) -> str:
-        conn = self.get_connection(self.dataprep_conn_id)
-        token = conn.extra_dejson.get("token")
-        if token is None:
-            raise AirflowException(
-                "Dataprep token is missing or has invalid format. "
-                "Please make sure that Dataprep token is added to the Airflow Connections."
-            )
-        return token
-
     @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
     def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched
         :type job_id: int
         """
-        url: str = f"{self._url}/{job_id}/jobs"
+
+        endpoint_path = f"v4/jobGroups/{job_id}/jobs"
+        url: str = os.path.join(self._base_url, endpoint_path)
         response = requests.get(url, headers=self._headers)
-        response.raise_for_status()
+        self._raise_for_status(response)
+        return response.json()
+
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
+    def get_job_group(self, job_group_id: int, embed: str, include_deleted: bool) -> Dict[str, Any]:
+        """
+        Get the specified job group.
+        A job group is a job that is executed from a specific node in a flow.
+
+        :param job_group_id The ID of the job that will be fetched
+        :type job_group_id: int
+        :param embed Comma-separated list of objects to pull in as part of the response
+        :type embed: str
+        :param include_deleted if set to "true", will include deleted objects
+        :type include_deleted: bool
+        """
+
+        params: Dict[str, Any] = {"embed": embed, "includeDeleted": include_deleted}
+        endpoint_path = f"v4/jobGroups/{job_group_id}"
+        url: str = os.path.join(self._base_url, endpoint_path)
+        response = requests.get(url, headers=self._headers, params=params)
+        self._raise_for_status(response)
+        return response.json()
+
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
+    def run_job_group(self, body_request: dict) -> Dict[str, Any]:
+        """
+        Creates a ``jobGroup``, which launches the specified job as the authenticated user.
+        This performs the same action as clicking on the Run Job button in the application.
+        To get recipe_id please follow the Dataprep API documentation
+        https://clouddataprep.com/documentation/api#operation/runJobGroup
+
+        :param body_request: The identifier for the recipe you would like to run.
+        :type body_request: dict
+        """
+
+        endpoint_path = "v4/jobGroups"
+        url: str = os.path.join(self._base_url, endpoint_path)
+        response = requests.post(url, headers=self._headers, data=json.dumps(body_request))
+        self._raise_for_status(response)
         return response.json()
+
+    def _raise_for_status(self, response):

Review comment:
       Would you mind adding type annotation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477615415



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -50,26 +54,60 @@ def _headers(self) -> Dict[str, str]:
         }
         return headers
 
-    @property
-    def _token(self) -> str:
-        conn = self.get_connection(self.dataprep_conn_id)
-        token = conn.extra_dejson.get("token")
-        if token is None:
-            raise AirflowException(
-                "Dataprep token is missing or has invalid format. "
-                "Please make sure that Dataprep token is added to the Airflow Connections."
-            )
-        return token
-
     @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
     def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched
         :type job_id: int
         """
-        url: str = f"{self._url}/{job_id}/jobs"
+
+        endpoint_path = f"/v4/jobGroups/{job_id}/jobs"
+        url: str = os.path.join(self._base_url, endpoint_path)
         response = requests.get(url, headers=self._headers)
         response.raise_for_status()

Review comment:
       It would be nice to fix it because running the system test I get:
   ```
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/tenacity/__init__.py", line 362, in call
       result = fn(*args, **kwargs)
     File "/opt/airflow/airflow/providers/google/cloud/hooks/dataprep.py", line 112, in run_job_group
       response.raise_for_status()
     File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 941, in raise_for_status
       raise HTTPError(http_error_msg, response=self)
   requests.exceptions.HTTPError: 404 Client Error: Not Found for url: https://api.clouddataprep.com/v4/jobGroups
   ```
   and I'm quite sure that during debugging we have seen a nice error message




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477609564



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -40,7 +41,10 @@ class GoogleDataprepHook(BaseHook):
     def __init__(self, dataprep_conn_id: str = "dataprep_conn_id") -> None:
         super().__init__()
         self.dataprep_conn_id = dataprep_conn_id
-        self._url = "https://api.clouddataprep.com/v4/jobGroups"
+        conn = self.get_connection(self.dataprep_conn_id)
+        extra_dejson = conn.extra_dejson
+        self._token = extra_dejson.get("token")

Review comment:
       ```suggestion
           self._token = extra_dejson.get("extra__dataprep__token")
   ```
   According to documentation and Airflow best practices 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477609796



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+import pytest
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
+EXTRA = {"token": TOKEN}

Review comment:
       ```suggestion
   EXTRA = {"extra__dataprep__token": TOKEN}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r476239468



##########
File path: airflow/providers/google/cloud/operators/dataprep.py
##########
@@ -51,6 +50,68 @@ def __init__(
 
     def execute(self, context: Dict):
         self.log.info("Fetching data for job with id: %d ...", self.job_id)
-        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id",)
         response = hook.get_jobs_for_job_group(job_id=self.job_id)
         return response
+
+
+class DataprepGetJobGroupOperator(BaseOperator):
+    """
+    Get the specified job group.
+    A job group is a job that is executed from a specific node in a flow.
+    API documentation https://clouddataprep.com/documentation/api#section/Overview
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DataprepGetJobGroupOperator`
+
+    :param job_id The ID of the job that will be requests
+    :type job_id: int
+    :param embed Comma-separated list of objects to pull in as part of the response
+    :type embed: string
+    :param include_deleted if set to "true", will include deleted objects
+    :type include_deleted: bool
+    """
+
+    template_fields = ("job_id",)
+
+    @apply_defaults
+    def __init__(
+        self, *, job_id: int, embed: str, include_deleted: bool, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.job_id = job_id
+        self.embed = embed
+        self.include_deleted = include_deleted
+
+    def execute(self, context: Dict):
+        self.log.info("Fetching data for job with id: %d ...", self.job_id)
+        hook = GoogleDataprepHook(dataprep_conn_id="dataprep_conn_id")
+        response = hook.get_job_group(job_id=self.job_id, embed="", include_deleted=self.include_deleted)

Review comment:
       Yes... even looks strange. Good catch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475529555



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -66,10 +67,57 @@ def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched
         :type job_id: int
         """
+
         url: str = f"{self._url}/{job_id}/jobs"
         response = requests.get(url, headers=self._headers)
         response.raise_for_status()
         return response.json()
+
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
+    def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[str, Any]:

Review comment:
       ```suggestion
       def get_job_group(self, job_group_id: int, embed: str, include_deleted: bool) -> Dict[str, Any]:
   ```
   ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475656693



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -66,10 +67,57 @@ def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched
         :type job_id: int
         """
+
         url: str = f"{self._url}/{job_id}/jobs"
         response = requests.get(url, headers=self._headers)
         response.raise_for_status()
         return response.json()
+
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
+    def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[str, Any]:

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475509378



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -82,11 +83,11 @@ def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[
         A job group is a job that is executed from a specific node in a flow.
 
         :param job_id The ID of the job that will be fetched.
-        :type job_id: int
+        :type job_id: int.
         :param embed Comma-separated list of objects to pull in as part of the response.
-        :type embed: string
+        :type embed: string.
         :param include_deleted if set to "true", will include deleted objects.
-        :type include_deleted: bool
+        :type include_deleted: bool.

Review comment:
       Looking for `:type .*: .* \.` in `airflow/providers` returns no records




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475661352



##########
File path: tests/providers/google/cloud/operators/test_dataprep_system.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from os import environ
+
+from airflow.models import Connection
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_connections
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest
+
+TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")

Review comment:
       The missing variable should be detected when you try to run the test. When it is missing, a user-friendly message for the user should be displayed. We don't have tests for authorization data in environment variables yet, so you will probably need to create one.
   See: 
   https://github.com/apache/airflow/blob/ 7ee7d7c/TESTING.rst#adding-a-new-system-test
   https://github.com/apache/airflow/blob/ 7ee7d7c/tests/conftest.py#L337-L343
   https://github.com/apache/airflow/blob/ 7ee7d7c/tests/conftest.py#L248-L250
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r480967800



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -50,26 +55,63 @@ def _headers(self) -> Dict[str, str]:
         }
         return headers
 
-    @property
-    def _token(self) -> str:
-        conn = self.get_connection(self.dataprep_conn_id)
-        token = conn.extra_dejson.get("token")
-        if token is None:
-            raise AirflowException(
-                "Dataprep token is missing or has invalid format. "
-                "Please make sure that Dataprep token is added to the Airflow Connections."
-            )
-        return token
-
     @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
     def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched
         :type job_id: int
         """
-        url: str = f"{self._url}/{job_id}/jobs"
+
+        endpoint_path = f"v4/jobGroups/{job_id}/jobs"
+        url: str = os.path.join(self._base_url, endpoint_path)
         response = requests.get(url, headers=self._headers)
-        response.raise_for_status()
+        self._raise_for_status(response)
+        return response.json()
+
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
+    def get_job_group(self, job_group_id: int, embed: str, include_deleted: bool) -> Dict[str, Any]:
+        """
+        Get the specified job group.
+        A job group is a job that is executed from a specific node in a flow.
+
+        :param job_group_id The ID of the job that will be fetched

Review comment:
       ```suggestion
           :param job_group_id: The ID of the job that will be fetched
   ```

##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -50,26 +55,63 @@ def _headers(self) -> Dict[str, str]:
         }
         return headers
 
-    @property
-    def _token(self) -> str:
-        conn = self.get_connection(self.dataprep_conn_id)
-        token = conn.extra_dejson.get("token")
-        if token is None:
-            raise AirflowException(
-                "Dataprep token is missing or has invalid format. "
-                "Please make sure that Dataprep token is added to the Airflow Connections."
-            )
-        return token
-
     @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
     def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]:
         """
         Get information about the batch jobs within a Cloud Dataprep job.
 
-        :param job_id The ID of the job that will be fetched.
+        :param job_id The ID of the job that will be fetched
         :type job_id: int
         """
-        url: str = f"{self._url}/{job_id}/jobs"
+
+        endpoint_path = f"v4/jobGroups/{job_id}/jobs"
+        url: str = os.path.join(self._base_url, endpoint_path)
         response = requests.get(url, headers=self._headers)
-        response.raise_for_status()
+        self._raise_for_status(response)
+        return response.json()
+
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10))
+    def get_job_group(self, job_group_id: int, embed: str, include_deleted: bool) -> Dict[str, Any]:
+        """
+        Get the specified job group.
+        A job group is a job that is executed from a specific node in a flow.
+
+        :param job_group_id The ID of the job that will be fetched
+        :type job_group_id: int
+        :param embed Comma-separated list of objects to pull in as part of the response

Review comment:
       ```suggestion
           :param embed: Comma-separated list of objects to pull in as part of the response
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r475462441



##########
File path: airflow/providers/google/cloud/hooks/dataprep.py
##########
@@ -82,11 +83,11 @@ def get_job_group(self, job_id: int, embed: str, include_deleted: bool) -> Dict[
         A job group is a job that is executed from a specific node in a flow.
 
         :param job_id The ID of the job that will be fetched.
-        :type job_id: int
+        :type job_id: int.
         :param embed Comma-separated list of objects to pull in as part of the response.
-        :type embed: string
+        :type embed: string.
         :param include_deleted if set to "true", will include deleted objects.
-        :type include_deleted: bool
+        :type include_deleted: bool.

Review comment:
       ```suggestion
           :type include_deleted: bool
   ```
   We avoid dots in type docs




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: WIP Added operator, hook, docs and tests for Dataprep

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477325874



##########
File path: tests/conftest.py
##########
@@ -251,6 +253,9 @@ def pytest_configure(config):
     config.addinivalue_line(
         "markers", "airflow_2: mark tests that works only on Airflow 2.0 / master"
     )
+    config.addinivalue_line(
+        "markers", "credential_env(name): mark tests that require credential env in CREDENTIALS_ENV"
+    )

Review comment:
       I think this is indeed an over-engineering. To run this test I have to create a file that requires much more work than exporting an env variable. It also assumes that the test is run from breeze environment or that I set up another env variable that points to other directory... 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10304: Added DataprepGetJobGroupOperator and DataprepRunJobGroupOperator for Dataprep service

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10304:
URL: https://github.com/apache/airflow/pull/10304#discussion_r477609212



##########
File path: docs/howto/operator/google/cloud/dataprep.rst
##########
@@ -17,7 +17,29 @@
 
 Google Dataprep Operators
 =========================
-`Google Dataprep API documentation is available here <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`__
+Dataprep is the intelligent cloud data service to visually explore, clean, and prepare data for analysis and machine learning.
+Service can be use to explore and transform raw data from disparate and/or large datasets into clean and structured data for further analysis and processing.
+Dataprep Job is an internal object encoding the information necessary to run a part of a Cloud Dataprep job group.
+For more information about the service visit `Google Dataprep API documentation <https://cloud.google.com/dataprep/docs/html/API-Reference_145281441>`_
+
+Before you begin
+^^^^^^^^^^^^^^^^
+Before using Dataprep within Airflow you need to authenticate your account with TOKEN.
+To get connection Dataprep with Airflow you need Dataprep token. Please follow Dataprep `instructions <https://clouddataprep.com/documentation/api#section/Authentication>`_ to do it.
+
+TOKEN should be added to the Connection in Airflow in JSON format.
+You can check `how to do such connection <https://airflow.readthedocs.io/en/stable/howto/connection/index.html#editing-a-connection-with-the-ui>`_.
+
+The DataprepRunJobGroupOperator will run specified job. Operator required a recipe id. To identify the recipe id please use `API documentation for runJobGroup <https://clouddataprep.com/documentation/api#operation/runJobGroup>`_
+E.g. if the URL is /flows/10?recipe=7, the recipe id is 7. The recipe cannot be created via this operator. It can be created only via UI which is available `here <https://clouddataprep.com/>`_.
+Some of parameters can be override by DAG's body request. How to do it is shown in example dag.
+
+See following example:
+Set values for these fields:
+.. code-block::
+
+  Conn Id: "your_conn_id"
+  Extra: {"extra__dataprep__token": "TOKEN"}

Review comment:
       Should we add information about configurable API url?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org