You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/09/09 00:58:52 UTC

[airflow] branch main updated: Life Science assets & system tests migration (AIP-47) (#25548)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a539ff663 Life Science assets & system tests migration (AIP-47) (#25548)
3a539ff663 is described below

commit 3a539ff6631109dc58514339ac60672f031c7054
Author: bkossakowska <10...@users.noreply.github.com>
AuthorDate: Fri Sep 9 00:58:43 2022 +0000

    Life Science assets & system tests migration (AIP-47) (#25548)
---
 .../providers/google/cloud/links/life_sciences.py  | 48 ++++++++++++++++++++
 .../google/cloud/operators/life_sciences.py        | 10 +++-
 airflow/providers/google/provider.yaml             |  1 +
 .../google/cloud/operators/test_life_sciences.py   |  7 ++-
 .../cloud/transfers/test_gdrive_to_gcs_system.py   |  6 +--
 .../google/cloud/utils/gcp_authenticator.py        |  1 -
 .../cloud/life_sciences/example_life_sciences.py   | 53 ++++++++++++++++++----
 .../cloud/life_sciences/resources/__init__.py      | 16 +++++++
 .../google/cloud/life_sciences/resources/file      |  0
 9 files changed, 126 insertions(+), 16 deletions(-)

diff --git a/airflow/providers/google/cloud/links/life_sciences.py b/airflow/providers/google/cloud/links/life_sciences.py
new file mode 100644
index 0000000000..5e9b3a5f68
--- /dev/null
+++ b/airflow/providers/google/cloud/links/life_sciences.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING
+
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+BASE_LINK = "https://console.cloud.google.com/lifesciences"
+LIFESCIENCES_LIST_LINK = BASE_LINK + "/pipelines?project={project_id}"
+
+
+class LifeSciencesLink(BaseGoogleLink):
+    """Helper class for constructing Life Sciences List link"""
+
+    name = "Life Sciences"
+    key = "lifesciences_key"
+    format_str = LIFESCIENCES_LIST_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance,
+        project_id: str,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=LifeSciencesLink.key,
+            value={
+                "project_id": project_id,
+            },
+        )
diff --git a/airflow/providers/google/cloud/operators/life_sciences.py b/airflow/providers/google/cloud/operators/life_sciences.py
index a3fc1b3ff5..f9ce575ada 100644
--- a/airflow/providers/google/cloud/operators/life_sciences.py
+++ b/airflow/providers/google/cloud/operators/life_sciences.py
@@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Optional, Sequence, Union
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.life_sciences import LifeSciencesHook
+from airflow.providers.google.cloud.links.life_sciences import LifeSciencesLink
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -57,6 +58,7 @@ class LifeSciencesRunPipelineOperator(BaseOperator):
         "api_version",
         "impersonation_chain",
     )
+    operator_extra_links = (LifeSciencesLink(),)
 
     def __init__(
         self,
@@ -90,5 +92,11 @@ class LifeSciencesRunPipelineOperator(BaseOperator):
             api_version=self.api_version,
             impersonation_chain=self.impersonation_chain,
         )
-
+        project_id = self.project_id or hook.project_id
+        if project_id:
+            LifeSciencesLink.persist(
+                context=context,
+                task_instance=self,
+                project_id=project_id,
+            )
         return hook.run_pipeline(body=self.body, location=self.location, project_id=self.project_id)
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index a3edbf5867..626bc7e7f4 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -1014,6 +1014,7 @@ extra-links:
   - airflow.providers.google.cloud.links.cloud_build.CloudBuildListLink
   - airflow.providers.google.cloud.links.cloud_build.CloudBuildTriggersListLink
   - airflow.providers.google.cloud.links.cloud_build.CloudBuildTriggerDetailsLink
+  - airflow.providers.google.cloud.links.life_sciences.LifeSciencesLink
   - airflow.providers.google.common.links.storage.StorageLink
   - airflow.providers.google.common.links.storage.FileDetailsLink
 
diff --git a/tests/providers/google/cloud/operators/test_life_sciences.py b/tests/providers/google/cloud/operators/test_life_sciences.py
index 1e86bd6c44..097c5dff69 100644
--- a/tests/providers/google/cloud/operators/test_life_sciences.py
+++ b/tests/providers/google/cloud/operators/test_life_sciences.py
@@ -42,7 +42,9 @@ class TestLifeSciencesRunPipelineOperator(unittest.TestCase):
         operator = LifeSciencesRunPipelineOperator(
             task_id='task-id', body=TEST_BODY, location=TEST_LOCATION, project_id=TEST_PROJECT_ID
         )
-        result = operator.execute(None)
+        context = mock.MagicMock()
+        result = operator.execute(context=context)
+
         assert result == TEST_OPERATION
 
     @mock.patch("airflow.providers.google.cloud.operators.life_sciences.LifeSciencesHook")
@@ -54,5 +56,6 @@ class TestLifeSciencesRunPipelineOperator(unittest.TestCase):
             body=TEST_BODY,
             location=TEST_LOCATION,
         )
-        result = operator.execute(None)
+        context = mock.MagicMock()
+        result = operator.execute(context=context)
         assert result == TEST_OPERATION
diff --git a/tests/providers/google/cloud/transfers/test_gdrive_to_gcs_system.py b/tests/providers/google/cloud/transfers/test_gdrive_to_gcs_system.py
index 505508ce84..dbfea81f8e 100644
--- a/tests/providers/google/cloud/transfers/test_gdrive_to_gcs_system.py
+++ b/tests/providers/google/cloud/transfers/test_gdrive_to_gcs_system.py
@@ -18,7 +18,7 @@
 import pytest
 
 from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY
-from tests.system.providers.google.cloud.life_sciences.example_life_sciences import BUCKET
+from tests.system.providers.google.cloud.life_sciences.example_life_sciences import BUCKET_NAME
 from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
 
 
@@ -28,7 +28,7 @@ class GoogleDriveToGCSExampleDagsSystemTest(GoogleSystemTest):
     @provide_gcp_context(GCP_GCS_KEY)
     def setUp(self):
         super().setUp()
-        self.create_gcs_bucket(BUCKET)
+        self.create_gcs_bucket(BUCKET_NAME)
 
     @provide_gcp_context(GCP_GCS_KEY)
     def test_run_example_dag_function(self):
@@ -36,5 +36,5 @@ class GoogleDriveToGCSExampleDagsSystemTest(GoogleSystemTest):
 
     @provide_gcp_context(GCP_GCS_KEY)
     def tearDown(self):
-        self.delete_gcs_bucket(BUCKET)
+        self.delete_gcs_bucket(BUCKET_NAME)
         super().tearDown()
diff --git a/tests/providers/google/cloud/utils/gcp_authenticator.py b/tests/providers/google/cloud/utils/gcp_authenticator.py
index 044b962a33..a4d087ce71 100644
--- a/tests/providers/google/cloud/utils/gcp_authenticator.py
+++ b/tests/providers/google/cloud/utils/gcp_authenticator.py
@@ -48,7 +48,6 @@ GCP_GCS_KEY = 'gcp_gcs.json'
 GCP_GCS_TRANSFER_KEY = 'gcp_gcs_transfer.json'
 GCP_GKE_KEY = "gcp_gke.json"
 GCP_KMS_KEY = "gcp_kms.json"
-GCP_LIFE_SCIENCES_KEY = 'gcp_life_sciences.json'
 GCP_MEMORYSTORE = 'gcp_memorystore.json'
 GCP_PUBSUB_KEY = "gcp_pubsub.json"
 GCP_SECRET_MANAGER_KEY = 'gcp_secret_manager.json'
diff --git a/tests/system/providers/google/cloud/life_sciences/example_life_sciences.py b/tests/system/providers/google/cloud/life_sciences/example_life_sciences.py
index c56ea87f9c..9d68261c7e 100644
--- a/tests/system/providers/google/cloud/life_sciences/example_life_sciences.py
+++ b/tests/system/providers/google/cloud/life_sciences/example_life_sciences.py
@@ -18,18 +18,26 @@
 
 import os
 from datetime import datetime
+from pathlib import Path
 
 from airflow import models
+from airflow.models.baseoperator import chain
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
 from airflow.providers.google.cloud.operators.life_sciences import LifeSciencesRunPipelineOperator
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "example_gcp_life_sciences"
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = "example_life_sciences"
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project-id")
-BUCKET = os.environ.get("GCP_GCS_LIFE_SCIENCES_BUCKET", "INVALID BUCKET NAME")
-FILENAME = os.environ.get("GCP_GCS_LIFE_SCIENCES_FILENAME", 'input.in')
-LOCATION = os.environ.get("GCP_LIFE_SCIENCES_LOCATION", 'us-central1')
+BUCKET_NAME = f"bucket_{DAG_ID}-{ENV_ID}"
 
+FILE_NAME = "file"
+LOCATION = "us-central1"
+
+CURRENT_FOLDER = Path(__file__).parent
+FILE_LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources" / FILE_NAME)
 
 # [START howto_configure_simple_action_pipeline]
 SIMPLE_ACTION_PIPELINE = {
@@ -53,7 +61,7 @@ MULTI_ACTION_PIPELINE = {
         "actions": [
             {
                 "imageUri": "google/cloud-sdk",
-                "commands": ["gsutil", "cp", f"gs://{BUCKET}/{FILENAME}", "/tmp"],
+                "commands": ["gsutil", "cp", f"gs://{BUCKET_NAME}/{FILE_NAME}", "/tmp"],
             },
             {"imageUri": "bash", "commands": ["-c", "echo Hello, world"]},
             {
@@ -61,8 +69,8 @@ MULTI_ACTION_PIPELINE = {
                 "commands": [
                     "gsutil",
                     "cp",
-                    f"gs://{BUCKET}/{FILENAME}",
-                    f"gs://{BUCKET}/output.in",
+                    f"gs://{BUCKET_NAME}/{FILE_NAME}",
+                    f"gs://{BUCKET_NAME}/output.in",
                 ],
             },
         ],
@@ -83,6 +91,14 @@ with models.DAG(
     catchup=False,
     tags=['example'],
 ) as dag:
+    create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME)
+
+    upload_file = LocalFilesystemToGCSOperator(
+        task_id="upload_file",
+        src=FILE_LOCAL_PATH,
+        dst=FILE_NAME,
+        bucket=BUCKET_NAME,
+    )
 
     # [START howto_run_pipeline]
     simple_life_science_action_pipeline = LifeSciencesRunPipelineOperator(
@@ -97,7 +113,26 @@ with models.DAG(
         task_id='multi-action-pipeline', body=MULTI_ACTION_PIPELINE, project_id=PROJECT_ID, location=LOCATION
     )
 
-    simple_life_science_action_pipeline >> multiple_life_science_action_pipeline
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+    )
+
+    chain(
+        # TEST SETUP
+        create_bucket,
+        upload_file,
+        # TEST BODY
+        simple_life_science_action_pipeline,
+        multiple_life_science_action_pipeline,
+        # TEST TEARDOWN
+        delete_bucket,
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
 
 
 from tests.system.utils import get_test_run  # noqa: E402
diff --git a/tests/system/providers/google/cloud/life_sciences/resources/__init__.py b/tests/system/providers/google/cloud/life_sciences/resources/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/life_sciences/resources/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/system/providers/google/cloud/life_sciences/resources/file b/tests/system/providers/google/cloud/life_sciences/resources/file
new file mode 100644
index 0000000000..e69de29bb2