You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ta...@apache.org on 2023/10/03 07:18:28 UTC

[airflow] branch main updated: Fix Memorystore Memcached system test (#34447)

This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 78f7da46c3 Fix Memorystore Memcached system test (#34447)
78f7da46c3 is described below

commit 78f7da46c39281158923286d6b111cee5d57cf9f
Author: max <42...@users.noreply.github.com>
AuthorDate: Tue Oct 3 09:18:20 2023 +0200

    Fix Memorystore Memcached system test (#34447)
---
 .../example_cloud_memorystore_memcached.py         | 59 +++++++++++++++++++++-
 1 file changed, 58 insertions(+), 1 deletion(-)

diff --git a/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py b/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
index 9e12caf1c4..7a6830e6dc 100644
--- a/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
+++ b/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
@@ -17,6 +17,11 @@
 # under the License.
 """
 Example Airflow DAG for Google Cloud Memorystore Memcached service.
+
+This DAG relies on the following OS environment variables
+
+* AIRFLOW__API__GOOGLE_KEY_PATH - Path to service account key file. Note, you can skip this variable if you
+  run this DAG in a Composer environment.
 """
 from __future__ import annotations
 
@@ -26,6 +31,7 @@ from datetime import datetime
 from google.protobuf.field_mask_pb2 import FieldMask
 
 from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
 from airflow.providers.google.cloud.operators.cloud_memorystore import (
     CloudMemorystoreMemcachedApplyParametersOperator,
     CloudMemorystoreMemcachedCreateInstanceOperator,
@@ -54,6 +60,51 @@ MEMCACHED_INSTANCE = {
 }
 # [END howto_operator_memcached_instance]
 
+IP_RANGE_NAME = f"ip-range-{DAG_ID}-{ENV_ID}".replace("_", "-")
+NETWORK = "default"
+CREATE_PRIVATE_CONNECTION_CMD = f"""
+if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
+ gcloud auth activate-service-account --key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
+fi;
+if [[ $(gcloud compute addresses list --project={PROJECT_ID} --filter="name=('{IP_RANGE_NAME}')") ]]; then \
+  echo "The IP range '{IP_RANGE_NAME}' already exists in the project '{PROJECT_ID}'."; \
+else \
+  echo "  Creating IP range..."; \
+  gcloud compute addresses create "{IP_RANGE_NAME}" \
+    --global \
+    --purpose=VPC_PEERING \
+    --prefix-length=16 \
+    --description="IP range for Memorystore system tests" \
+    --network={NETWORK} \
+    --project={PROJECT_ID}; \
+  echo "Done."; \
+fi;
+if [[ $(gcloud services vpc-peerings list --network={NETWORK} --project={PROJECT_ID}) ]]; then \
+  echo "The private connection already exists in the project '{PROJECT_ID}'."; \
+else \
+  echo "  Creating private connection..."; \
+  gcloud services vpc-peerings connect \
+    --service=servicenetworking.googleapis.com \
+    --ranges={IP_RANGE_NAME} \
+    --network={NETWORK} \
+    --project={PROJECT_ID}; \
+  echo "Done."; \
+fi;
+if [[ $(gcloud services vpc-peerings list \
+        --network={NETWORK} \
+        --project={PROJECT_ID} \
+        --format="value(reservedPeeringRanges)" | grep {IP_RANGE_NAME}) ]]; then \
+  echo "Private service connection configured."; \
+else \
+  echo "  Updating service private connection..."; \
+  gcloud services vpc-peerings update \
+    --service=servicenetworking.googleapis.com \
+    --ranges={IP_RANGE_NAME} \
+    --network={NETWORK} \
+    --project={PROJECT_ID} \
+    --force; \
+fi;
+"""
 
 with DAG(
     DAG_ID,
@@ -62,6 +113,11 @@ with DAG(
     catchup=False,
     tags=["example"],
 ) as dag:
+    create_private_service_connection = BashOperator(
+        task_id="create_private_service_connection",
+        bash_command=CREATE_PRIVATE_CONNECTION_CMD,
+    )
+
     # [START howto_operator_create_instance_memcached]
     create_memcached_instance = CloudMemorystoreMemcachedCreateInstanceOperator(
         task_id="create-instance",
@@ -129,7 +185,8 @@ with DAG(
     # [END howto_operator_update_and_apply_parameters_memcached]
 
     (
-        create_memcached_instance
+        create_private_service_connection
+        >> create_memcached_instance
         >> get_memcached_instance
         >> list_memcached_instances
         >> update_memcached_instance