You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/12 01:18:42 UTC

[GitHub] kaxil closed pull request #4489: [AIRFLOW-3676] Add required permission to CloudSQL export/import example

kaxil closed pull request #4489: [AIRFLOW-3676] Add required permission to CloudSQL export/import example
URL: https://github.com/apache/airflow/pull/4489
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/example_dags/example_gcp_sql.py b/airflow/contrib/example_dags/example_gcp_sql.py
index c6838a2baf..eaf1f7d404 100644
--- a/airflow/contrib/example_dags/example_gcp_sql.py
+++ b/airflow/contrib/example_dags/example_gcp_sql.py
@@ -30,8 +30,6 @@
 
 import os
 
-import re
-
 import airflow
 from airflow import models
 from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceCreateOperator, \
@@ -43,6 +41,8 @@
     GoogleCloudStorageBucketCreateAclEntryOperator, \
     GoogleCloudStorageObjectCreateAclEntryOperator
 
+from six.moves.urllib.parse import urlsplit
+
 # [START howto_operator_cloudsql_arguments]
 PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
 INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'test-mysql')
@@ -175,22 +175,22 @@ def next_dep(task, prev):
     # ############################################## #
 
     # [START howto_operator_cloudsql_create]
-    sql_instance_create = CloudSqlInstanceCreateOperator(
+    sql_instance_create_task = CloudSqlInstanceCreateOperator(
         project_id=PROJECT_ID,
         body=body,
         instance=INSTANCE_NAME,
-        task_id='sql_instance_create'
+        task_id='sql_instance_create_task'
     )
     # [END howto_operator_cloudsql_create]
-    prev_task = sql_instance_create
+    prev_task = sql_instance_create_task
 
-    sql_instance_create_2 = CloudSqlInstanceCreateOperator(
+    sql_instance_create_2_task = CloudSqlInstanceCreateOperator(
         project_id=PROJECT_ID,
         body=body2,
         instance=INSTANCE_NAME2,
-        task_id='sql_instance_create_2'
+        task_id='sql_instance_create_2_task'
     )
-    prev_task = next_dep(sql_instance_create_2, prev_task)
+    prev_task = next_dep(sql_instance_create_2_task, prev_task)
 
     # ############################################## #
     # ### MODIFYING INSTANCE AND ITS DATABASE ###### #
@@ -230,18 +230,19 @@ def next_dep(task, prev):
     # ############################################## #
     # ### EXPORTING SQL FROM INSTANCE 1 ############ #
     # ############################################## #
+    export_url_split = urlsplit(EXPORT_URI)
 
     # For export to work we need to add the Cloud SQL instance's Service Account
     # write access to the destination GCS bucket.
     # [START howto_operator_cloudsql_export_gcs_permissions]
-    sql_gcp_add_bucket_permission = GoogleCloudStorageBucketCreateAclEntryOperator(
-        entity="user-{{ task_instance.xcom_pull('sql_instance_create', key='service_account_email') }}",
+    sql_gcp_add_bucket_permission_task = GoogleCloudStorageBucketCreateAclEntryOperator(
+        entity="user-{{ task_instance.xcom_pull('sql_instance_create_task', key='service_account_email') }}",
         role="WRITER",
-        bucket=re.match(r'gs:\/\/(\S*)\/', EXPORT_URI).group(1),
-        task_id='sql_gcp_add_bucket_permission'
+        bucket=export_url_split[1],  # netloc (bucket)
+        task_id='sql_gcp_add_bucket_permission_task'
     )
     # [END howto_operator_cloudsql_export_gcs_permissions]
-    prev_task = next_dep(sql_gcp_add_bucket_permission, prev_task)
+    prev_task = next_dep(sql_gcp_add_bucket_permission_task, prev_task)
 
     # [START howto_operator_cloudsql_export]
     sql_export_task = CloudSqlInstanceExportOperator(
@@ -256,19 +257,33 @@ def next_dep(task, prev):
     # ############################################## #
     # ### IMPORTING SQL TO INSTANCE 2 ############## #
     # ############################################## #
+    import_url_split = urlsplit(IMPORT_URI)
 
     # For import to work we need to add the Cloud SQL instance's Service Account
     # read access to the target GCS object.
     # [START howto_operator_cloudsql_import_gcs_permissions]
-    sql_gcp_add_object_permission = GoogleCloudStorageObjectCreateAclEntryOperator(
-        entity="user-{{ task_instance.xcom_pull('sql_instance_create_2', key='service_account_email') }}",
+    sql_gcp_add_object_permission_task = GoogleCloudStorageObjectCreateAclEntryOperator(
+        entity="user-{{ task_instance.xcom_pull('sql_instance_create_2_task', "
+               "key='service_account_email') }}",
         role="READER",
-        bucket=re.match(r'gs:\/\/(\S*)\/', IMPORT_URI).group(1),
-        object_name=re.match(r'gs:\/\/[^\/]*\/(\S*)', IMPORT_URI).group(1),
-        task_id='sql_gcp_add_object_permission',
+        bucket=import_url_split[1],  # netloc (bucket)
+        object_name=import_url_split[2][1:],  # path (strip first '/')
+        task_id='sql_gcp_add_object_permission_task',
+    )
+    prev_task = next_dep(sql_gcp_add_object_permission_task, prev_task)
+
+    # For import to work we also need to add the Cloud SQL instance's Service Account
+    # write access to the whole bucket!.
+    sql_gcp_add_bucket_permission_2_task = GoogleCloudStorageBucketCreateAclEntryOperator(
+        entity="user-{{ task_instance.xcom_pull("
+               "'sql_instance_create_2_task', key='service_account_email') "
+               "}}",
+        role="WRITER",
+        bucket=import_url_split[1],  # netloc
+        task_id='sql_gcp_add_bucket_permission_2_task',
     )
     # [END howto_operator_cloudsql_import_gcs_permissions]
-    prev_task = next_dep(sql_gcp_add_object_permission, prev_task)
+    prev_task = next_dep(sql_gcp_add_bucket_permission_2_task, prev_task)
 
     # [START howto_operator_cloudsql_import]
     sql_import_task = CloudSqlInstanceImportOperator(
@@ -307,9 +322,9 @@ def next_dep(task, prev):
     # [END howto_operator_cloudsql_delete]
     prev_task = next_dep(sql_instance_delete_task, prev_task)
 
-    sql_instance_delete_task_2 = CloudSqlInstanceDeleteOperator(
+    sql_instance_delete_2_task = CloudSqlInstanceDeleteOperator(
         project_id=PROJECT_ID,
         instance=INSTANCE_NAME2,
-        task_id='sql_instance_delete_task_2'
+        task_id='sql_instance_delete_2_task'
     )
-    prev_task = next_dep(sql_instance_delete_task_2, prev_task)
+    prev_task = next_dep(sql_instance_delete_2_task, prev_task)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services