You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/19 22:06:52 UTC
[airflow] branch main updated: Migrate Google gcs_to_sheets DAG to new design AIP-47 (#24501)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dd35fdaf35 Migrate Google gcs_to_sheets DAG to new design AIP-47 (#24501)
dd35fdaf35 is described below
commit dd35fdaf35b6e46fd69a1b1da36ae7ffc0505dcb
Author: Chenglong Yan <al...@gmail.com>
AuthorDate: Mon Jun 20 06:06:46 2022 +0800
Migrate Google gcs_to_sheets DAG to new design AIP-47 (#24501)
related: #22430, #22447
---
.../operators/transfer/gcs_to_sheets.rst | 2 +-
.../suite/transfers/test_gcs_to_sheets_system.py | 46 ----------------------
.../google/cloud/gcs}/example_gcs_to_sheets.py | 44 ++++++++++++++++++---
3 files changed, 39 insertions(+), 53 deletions(-)
diff --git a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_sheets.rst b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_sheets.rst
index 74c8a28344..4ae780e1a8 100644
--- a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_sheets.rst
+++ b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_sheets.rst
@@ -38,7 +38,7 @@ Upload data from GCS to Google Sheets
To upload data from Google Cloud Storage to Google Spreadsheet you can use the
:class:`~airflow.providers.google.suite.transfers.gcs_to_sheets.GCSToGoogleSheetsOperator`.
-.. exampleinclude:: /../../airflow/providers/google/suite/example_dags/example_gcs_to_sheets.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_gcs_to_sheets.py
:language: python
:dedent: 4
:start-after: [START upload_gcs_to_sheets]
diff --git a/tests/providers/google/suite/transfers/test_gcs_to_sheets_system.py b/tests/providers/google/suite/transfers/test_gcs_to_sheets_system.py
deleted file mode 100644
index 0ab198275b..0000000000
--- a/tests/providers/google/suite/transfers/test_gcs_to_sheets_system.py
+++ /dev/null
@@ -1,46 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pytest
-
-from airflow.providers.google.suite.example_dags.example_gcs_to_sheets import BUCKET
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY
-from tests.test_utils.gcp_system_helpers import GSUITE_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
-
-# Required scopes
-SCOPES = [
- 'https://www.googleapis.com/auth/drive',
- 'https://www.googleapis.com/auth/cloud-platform',
-]
-
-
-@pytest.mark.backend("mysql", "postgres")
-@pytest.mark.credential_file(GCP_GCS_KEY)
-class GoogleSheetsToGCSExampleDagsSystemTest(GoogleSystemTest):
- @provide_gcp_context(GCP_GCS_KEY)
- def setUp(self):
- super().setUp()
- self.create_gcs_bucket(BUCKET)
-
- @provide_gcp_context(GCP_GCS_KEY, scopes=SCOPES)
- def test_run_example_dag_function(self):
- self.run_dag('example_gcs_to_sheets', GSUITE_DAG_FOLDER)
-
- @provide_gcp_context(GCP_GCS_KEY)
- def tearDown(self):
- self.delete_gcs_bucket(BUCKET)
- super().tearDown()
diff --git a/airflow/providers/google/suite/example_dags/example_gcs_to_sheets.py b/tests/system/providers/google/cloud/gcs/example_gcs_to_sheets.py
similarity index 57%
rename from airflow/providers/google/suite/example_dags/example_gcs_to_sheets.py
rename to tests/system/providers/google/cloud/gcs/example_gcs_to_sheets.py
index 7385d49f29..5c7f2109d2 100644
--- a/airflow/providers/google/suite/example_dags/example_gcs_to_sheets.py
+++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_sheets.py
@@ -20,34 +20,66 @@ import os
from datetime import datetime
from airflow import models
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.sheets_to_gcs import GoogleSheetsToGCSOperator
from airflow.providers.google.suite.transfers.gcs_to_sheets import GCSToGoogleSheetsOperator
+from airflow.utils.trigger_rule import TriggerRule
-BUCKET = os.environ.get("GCP_GCS_BUCKET", "example-test-bucket3")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = "example_gcs_to_sheets"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
SPREADSHEET_ID = os.environ.get("SPREADSHEET_ID", "example-spreadsheetID")
NEW_SPREADSHEET_ID = os.environ.get("NEW_SPREADSHEET_ID", "1234567890qwerty")
with models.DAG(
- "example_gcs_to_sheets",
+ DAG_ID,
start_date=datetime(2021, 1, 1),
schedule_interval='@once', # Override to match your needs
catchup=False,
- tags=["example"],
+ tags=["example", "gcs"],
) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
+ )
upload_sheet_to_gcs = GoogleSheetsToGCSOperator(
task_id="upload_sheet_to_gcs",
- destination_bucket=BUCKET,
+ destination_bucket=BUCKET_NAME,
spreadsheet_id=SPREADSHEET_ID,
)
# [START upload_gcs_to_sheets]
upload_gcs_to_sheet = GCSToGoogleSheetsOperator(
task_id="upload_gcs_to_sheet",
- bucket_name=BUCKET,
+ bucket_name=BUCKET_NAME,
object_name="{{ task_instance.xcom_pull('upload_sheet_to_gcs')[0] }}",
spreadsheet_id=NEW_SPREADSHEET_ID,
)
# [END upload_gcs_to_sheets]
- upload_sheet_to_gcs >> upload_gcs_to_sheet
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ >> upload_sheet_to_gcs
+ # TEST BODY
+ >> upload_gcs_to_sheet
+ # TEST TEARDOWN
+ >> delete_bucket
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)