You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ta...@apache.org on 2023/10/03 07:18:28 UTC
[airflow] branch main updated: Fix Memorystore Memcached system test (#34447)
This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 78f7da46c3 Fix Memorystore Memcached system test (#34447)
78f7da46c3 is described below
commit 78f7da46c39281158923286d6b111cee5d57cf9f
Author: max <42...@users.noreply.github.com>
AuthorDate: Tue Oct 3 09:18:20 2023 +0200
Fix Memorystore Memcached system test (#34447)
---
.../example_cloud_memorystore_memcached.py | 59 +++++++++++++++++++++-
1 file changed, 58 insertions(+), 1 deletion(-)
diff --git a/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py b/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
index 9e12caf1c4..7a6830e6dc 100644
--- a/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
+++ b/tests/system/providers/google/cloud/cloud_memorystore/example_cloud_memorystore_memcached.py
@@ -17,6 +17,11 @@
# under the License.
"""
Example Airflow DAG for Google Cloud Memorystore Memcached service.
+
+This DAG relies on the following OS environment variables
+
+* AIRFLOW__API__GOOGLE_KEY_PATH - Path to service account key file. Note, you can skip this variable if you
+ run this DAG in a Composer environment.
"""
from __future__ import annotations
@@ -26,6 +31,7 @@ from datetime import datetime
from google.protobuf.field_mask_pb2 import FieldMask
from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.cloud_memorystore import (
CloudMemorystoreMemcachedApplyParametersOperator,
CloudMemorystoreMemcachedCreateInstanceOperator,
@@ -54,6 +60,51 @@ MEMCACHED_INSTANCE = {
}
# [END howto_operator_memcached_instance]
+IP_RANGE_NAME = f"ip-range-{DAG_ID}-{ENV_ID}".replace("_", "-")
+NETWORK = "default"
+CREATE_PRIVATE_CONNECTION_CMD = f"""
+if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
+ gcloud auth activate-service-account --key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
+fi;
+if [[ $(gcloud compute addresses list --project={PROJECT_ID} --filter="name=('{IP_RANGE_NAME}')") ]]; then \
+ echo "The IP range '{IP_RANGE_NAME}' already exists in the project '{PROJECT_ID}'."; \
+else \
+ echo " Creating IP range..."; \
+ gcloud compute addresses create "{IP_RANGE_NAME}" \
+ --global \
+ --purpose=VPC_PEERING \
+ --prefix-length=16 \
+ --description="IP range for Memorystore system tests" \
+ --network={NETWORK} \
+ --project={PROJECT_ID}; \
+ echo "Done."; \
+fi;
+if [[ $(gcloud services vpc-peerings list --network={NETWORK} --project={PROJECT_ID}) ]]; then \
+ echo "The private connection already exists in the project '{PROJECT_ID}'."; \
+else \
+ echo " Creating private connection..."; \
+ gcloud services vpc-peerings connect \
+ --service=servicenetworking.googleapis.com \
+ --ranges={IP_RANGE_NAME} \
+ --network={NETWORK} \
+ --project={PROJECT_ID}; \
+ echo "Done."; \
+fi;
+if [[ $(gcloud services vpc-peerings list \
+ --network={NETWORK} \
+ --project={PROJECT_ID} \
+ --format="value(reservedPeeringRanges)" | grep {IP_RANGE_NAME}) ]]; then \
+ echo "Private service connection configured."; \
+else \
+ echo " Updating service private connection..."; \
+ gcloud services vpc-peerings update \
+ --service=servicenetworking.googleapis.com \
+ --ranges={IP_RANGE_NAME} \
+ --network={NETWORK} \
+ --project={PROJECT_ID} \
+ --force; \
+fi;
+"""
with DAG(
DAG_ID,
@@ -62,6 +113,11 @@ with DAG(
catchup=False,
tags=["example"],
) as dag:
+ create_private_service_connection = BashOperator(
+ task_id="create_private_service_connection",
+ bash_command=CREATE_PRIVATE_CONNECTION_CMD,
+ )
+
# [START howto_operator_create_instance_memcached]
create_memcached_instance = CloudMemorystoreMemcachedCreateInstanceOperator(
task_id="create-instance",
@@ -129,7 +185,8 @@ with DAG(
# [END howto_operator_update_and_apply_parameters_memcached]
(
- create_memcached_instance
+ create_private_service_connection
+ >> create_memcached_instance
>> get_memcached_instance
>> list_memcached_instances
>> update_memcached_instance