You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/01/22 12:05:39 UTC
[airflow] branch master updated: Improve environment variables in
GCP Datafusion system test (#13837)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new e7946f1 Improve environment variables in GCP Datafusion system test (#13837)
e7946f1 is described below
commit e7946f1cb7c144181443cbcc843d90bd597b09b5
Author: Tobiasz Kędzierski <to...@polidea.com>
AuthorDate: Fri Jan 22 13:05:17 2021 +0100
Improve environment variables in GCP Datafusion system test (#13837)
It will help to parametrize system tests
---
.../google/cloud/example_dags/example_datafusion.py | 15 ++++++++++-----
.../google/cloud/operators/test_datafusion_system.py | 10 +++++-----
2 files changed, 15 insertions(+), 10 deletions(-)
diff --git a/airflow/providers/google/cloud/example_dags/example_datafusion.py b/airflow/providers/google/cloud/example_dags/example_datafusion.py
index e5c18c5..93f238a 100644
--- a/airflow/providers/google/cloud/example_dags/example_datafusion.py
+++ b/airflow/providers/google/cloud/example_dags/example_datafusion.py
@@ -18,6 +18,7 @@
"""
Example Airflow DAG that shows how to use DataFusion.
"""
+import os
from airflow import models
from airflow.operators.bash import BashOperator
@@ -41,9 +42,13 @@ LOCATION = "europe-north1"
INSTANCE_NAME = "airflow-test-instance"
INSTANCE = {"type": "BASIC", "displayName": INSTANCE_NAME}
-BUCKET1 = "gs://test-bucket--2h83r23r"
-BUCKET2 = "gs://test-bucket--2d23h83r23r"
-PIPELINE_NAME = "airflow_test"
+BUCKET_1 = os.environ.get("GCP_DATAFUSION_BUCKET_1", "test-datafusion-bucket-1")
+BUCKET_2 = os.environ.get("GCP_DATAFUSION_BUCKET_2", "test-datafusion-bucket-2")
+
+BUCKET_1_URI = f"gs//{BUCKET_1}"
+BUCKET_2_URI = f"gs//{BUCKET_2}"
+
+PIPELINE_NAME = os.environ.get("GCP_DATAFUSION_PIPELINE_NAME", "airflow_test")
PIPELINE = {
"name": "test-pipe",
"description": "Data Pipeline Application",
@@ -79,7 +84,7 @@ PIPELINE = {
"encrypted": "false",
"schema": '{"type":"record","name":"etlSchemaBody","fields":'
'[{"name":"offset","type":"long"},{"name":"body","type":"string"}]}',
- "path": BUCKET1,
+ "path": BUCKET_1_URI,
"referenceName": "foo_bucket",
},
},
@@ -111,7 +116,7 @@ PIPELINE = {
"schema": '{"type":"record","name":"etlSchemaBody","fields":'
'[{"name":"offset","type":"long"},{"name":"body","type":"string"}]}',
"referenceName": "bar",
- "path": BUCKET2,
+ "path": BUCKET_2_URI,
},
},
"outputSchema": [
diff --git a/tests/providers/google/cloud/operators/test_datafusion_system.py b/tests/providers/google/cloud/operators/test_datafusion_system.py
index aaddb57..16ddf35 100644
--- a/tests/providers/google/cloud/operators/test_datafusion_system.py
+++ b/tests/providers/google/cloud/operators/test_datafusion_system.py
@@ -17,7 +17,7 @@
# under the License.
import pytest
-from airflow.providers.google.cloud.example_dags.example_datafusion import BUCKET1, BUCKET2
+from airflow.providers.google.cloud.example_dags.example_datafusion import BUCKET_1, BUCKET_2
from tests.providers.google.cloud.utils.gcp_authenticator import GCP_DATAFUSION_KEY
from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
@@ -26,12 +26,12 @@ from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTe
@pytest.mark.credential_file(GCP_DATAFUSION_KEY)
class CloudDataFusionExampleDagsSystemTest(GoogleSystemTest):
def setUp(self) -> None:
- self.create_gcs_bucket(name=BUCKET1)
- self.create_gcs_bucket(name=BUCKET2)
+ self.create_gcs_bucket(name=BUCKET_1)
+ self.create_gcs_bucket(name=BUCKET_2)
def tearDown(self) -> None:
- self.delete_gcs_bucket(name=BUCKET1)
- self.delete_gcs_bucket(name=BUCKET2)
+ self.delete_gcs_bucket(name=BUCKET_1)
+ self.delete_gcs_bucket(name=BUCKET_2)
@provide_gcp_context(GCP_DATAFUSION_KEY)
def test_run_example_dag_function(self):