You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/04/25 18:36:33 UTC

[airflow] branch main updated (8cfb2be989 -> d9e7b6a940)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


    from 8cfb2be989 Add doc and example dag for AWS Step Functions Operators
     new 43ded6c877 Add links for Cloud Datastore operators
     new b3cc2f5d10 Fix pre-commit check
     new 544d658921 Change CloudDatastoreExportEntitiesLink to StorageLink
     new d9e7b6a940 Update unit tests for Datastore operators

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../cloud/links/{dataplex.py => datastore.py}      | 41 ++++++++++------------
 .../providers/google/cloud/operators/datastore.py  | 17 +++++++++
 airflow/providers/google/provider.yaml             |  2 ++
 .../google/cloud/operators/test_datastore.py       | 14 +++++---
 4 files changed, 47 insertions(+), 27 deletions(-)
 copy airflow/providers/google/cloud/links/{dataplex.py => datastore.py} (58%)


[airflow] 02/04: Fix pre-commit check

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b3cc2f5d102214067c40b3c120364918556e7cca
Author: MaksYermak <ma...@gmail.com>
AuthorDate: Tue Apr 19 14:28:44 2022 +0200

    Fix pre-commit check
---
 airflow/providers/google/cloud/links/datastore.py     | 2 +-
 airflow/providers/google/cloud/operators/datastore.py | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/google/cloud/links/datastore.py b/airflow/providers/google/cloud/links/datastore.py
index c8066f0246..0416896edd 100644
--- a/airflow/providers/google/cloud/links/datastore.py
+++ b/airflow/providers/google/cloud/links/datastore.py
@@ -94,4 +94,4 @@ class CloudDatastoreEntitiesLink(BaseGoogleLink):
             value={
                 "project_id": task_instance.project_id,
             },
-        )
\ No newline at end of file
+        )
diff --git a/airflow/providers/google/cloud/operators/datastore.py b/airflow/providers/google/cloud/operators/datastore.py
index 4f24ba4420..a654e3126c 100644
--- a/airflow/providers/google/cloud/operators/datastore.py
+++ b/airflow/providers/google/cloud/operators/datastore.py
@@ -24,9 +24,9 @@ from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.datastore import DatastoreHook
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
 from airflow.providers.google.cloud.links.datastore import (
+    CloudDatastoreEntitiesLink,
     CloudDatastoreExportEntitiesLink,
     CloudDatastoreImportExportLink,
-    CloudDatastoreEntitiesLink,
 )
 
 if TYPE_CHECKING:


[airflow] 03/04: Change CloudDatastoreExportEntitiesLink to StorageLink

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 544d658921cb2c9ac1abc5a89f2e275255cb924b
Author: MaksYermak <ma...@gmail.com>
AuthorDate: Tue Apr 19 15:49:04 2022 +0200

    Change CloudDatastoreExportEntitiesLink to StorageLink
---
 airflow/providers/google/cloud/links/datastore.py  | 24 ----------------------
 .../providers/google/cloud/operators/datastore.py  |  8 ++++----
 airflow/providers/google/provider.yaml             |  1 -
 3 files changed, 4 insertions(+), 29 deletions(-)

diff --git a/airflow/providers/google/cloud/links/datastore.py b/airflow/providers/google/cloud/links/datastore.py
index 0416896edd..d17a6a8ef0 100644
--- a/airflow/providers/google/cloud/links/datastore.py
+++ b/airflow/providers/google/cloud/links/datastore.py
@@ -52,30 +52,6 @@ class CloudDatastoreImportExportLink(BaseGoogleLink):
         )
 
 
-class CloudDatastoreExportEntitiesLink(BaseGoogleLink):
-    """Helper class for constructing Cloud Datastore Export Entities Link"""
-
-    name = "Export Entities"
-    key = "export_conf"
-    format_str = DATASTORE_EXPORT_ENTITIES_LINK
-
-    @staticmethod
-    def persist(
-        context: "Context",
-        task_instance,
-        output_url: str,
-    ):
-        task_instance.xcom_push(
-            context=context,
-            key=CloudDatastoreExportEntitiesLink.key,
-            value={
-                "project_id": task_instance.project_id,
-                "bucket_name": task_instance.bucket,
-                "export_name": output_url.split('/')[3],
-            },
-        )
-
-
 class CloudDatastoreEntitiesLink(BaseGoogleLink):
     """Helper class for constructing Cloud Datastore Entities Link"""
 
diff --git a/airflow/providers/google/cloud/operators/datastore.py b/airflow/providers/google/cloud/operators/datastore.py
index a654e3126c..4d168cb2e1 100644
--- a/airflow/providers/google/cloud/operators/datastore.py
+++ b/airflow/providers/google/cloud/operators/datastore.py
@@ -25,9 +25,9 @@ from airflow.providers.google.cloud.hooks.datastore import DatastoreHook
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
 from airflow.providers.google.cloud.links.datastore import (
     CloudDatastoreEntitiesLink,
-    CloudDatastoreExportEntitiesLink,
     CloudDatastoreImportExportLink,
 )
+from airflow.providers.google.common.links.storage import StorageLink
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -78,7 +78,7 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
         'labels',
         'impersonation_chain',
     )
-    operator_extra_links = (CloudDatastoreExportEntitiesLink(),)
+    operator_extra_links = (StorageLink(),)
 
     def __init__(
         self,
@@ -138,10 +138,10 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
         state = result['metadata']['common']['state']
         if state != 'SUCCESSFUL':
             raise AirflowException(f'Operation failed: result={result}')
-        CloudDatastoreExportEntitiesLink.persist(
+        StorageLink.persist(
             context=context,
             task_instance=self,
-            output_url=result['response']['outputUrl'],
+            uri=f"{self.bucket}/{result['response']['outputUrl'].split('/')[3]}",
         )
         return result
 
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 4ef112ead9..b1ef488269 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -905,7 +905,6 @@ extra-links:
   - airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentsLink
   - airflow.providers.google.cloud.links.dataflow.DataflowJobLink
   - airflow.providers.google.cloud.links.datastore.CloudDatastoreImportExportLink
-  - airflow.providers.google.cloud.links.datastore.CloudDatastoreExportEntitiesLink
   - airflow.providers.google.cloud.links.datastore.CloudDatastoreEntitiesLink
   - airflow.providers.google.common.links.storage.StorageLink
 


[airflow] 01/04: Add links for Cloud Datastore operators

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 43ded6c877e90f53e4f5849499a6ed9585b309b7
Author: MaksYermak <ma...@gmail.com>
AuthorDate: Tue Apr 19 13:36:38 2022 +0200

    Add links for Cloud Datastore operators
---
 airflow/providers/google/cloud/links/datastore.py  | 97 ++++++++++++++++++++++
 .../providers/google/cloud/operators/datastore.py  | 17 ++++
 airflow/providers/google/provider.yaml             |  3 +
 3 files changed, 117 insertions(+)

diff --git a/airflow/providers/google/cloud/links/datastore.py b/airflow/providers/google/cloud/links/datastore.py
new file mode 100644
index 0000000000..c8066f0246
--- /dev/null
+++ b/airflow/providers/google/cloud/links/datastore.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING
+
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+BASE_LINK = "https://console.cloud.google.com"
+DATASTORE_BASE_LINK = BASE_LINK + "/datastore"
+DATASTORE_IMPORT_EXPORT_LINK = DATASTORE_BASE_LINK + "/import-export?project={project_id}"
+DATASTORE_EXPORT_ENTITIES_LINK = (
+    BASE_LINK + "/storage/browser/{bucket_name}/{export_name}?project={project_id}"
+)
+DATASTORE_ENTITIES_LINK = DATASTORE_BASE_LINK + "/entities/query/kind?project={project_id}"
+
+
+class CloudDatastoreImportExportLink(BaseGoogleLink):
+    """Helper class for constructing Cloud Datastore Import/Export Link"""
+
+    name = "Import/Export Page"
+    key = "import_export_conf"
+    format_str = DATASTORE_IMPORT_EXPORT_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=CloudDatastoreImportExportLink.key,
+            value={
+                "project_id": task_instance.project_id,
+            },
+        )
+
+
+class CloudDatastoreExportEntitiesLink(BaseGoogleLink):
+    """Helper class for constructing Cloud Datastore Export Entities Link"""
+
+    name = "Export Entities"
+    key = "export_conf"
+    format_str = DATASTORE_EXPORT_ENTITIES_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance,
+        output_url: str,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=CloudDatastoreExportEntitiesLink.key,
+            value={
+                "project_id": task_instance.project_id,
+                "bucket_name": task_instance.bucket,
+                "export_name": output_url.split('/')[3],
+            },
+        )
+
+
+class CloudDatastoreEntitiesLink(BaseGoogleLink):
+    """Helper class for constructing Cloud Datastore Entities Link"""
+
+    name = "Entities"
+    key = "entities_conf"
+    format_str = DATASTORE_ENTITIES_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=CloudDatastoreEntitiesLink.key,
+            value={
+                "project_id": task_instance.project_id,
+            },
+        )
\ No newline at end of file
diff --git a/airflow/providers/google/cloud/operators/datastore.py b/airflow/providers/google/cloud/operators/datastore.py
index 8282057518..4f24ba4420 100644
--- a/airflow/providers/google/cloud/operators/datastore.py
+++ b/airflow/providers/google/cloud/operators/datastore.py
@@ -23,6 +23,11 @@ from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.datastore import DatastoreHook
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
+from airflow.providers.google.cloud.links.datastore import (
+    CloudDatastoreExportEntitiesLink,
+    CloudDatastoreImportExportLink,
+    CloudDatastoreEntitiesLink,
+)
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -73,6 +78,7 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
         'labels',
         'impersonation_chain',
     )
+    operator_extra_links = (CloudDatastoreExportEntitiesLink(),)
 
     def __init__(
         self,
@@ -132,6 +138,11 @@ class CloudDatastoreExportEntitiesOperator(BaseOperator):
         state = result['metadata']['common']['state']
         if state != 'SUCCESSFUL':
             raise AirflowException(f'Operation failed: result={result}')
+        CloudDatastoreExportEntitiesLink.persist(
+            context=context,
+            task_instance=self,
+            output_url=result['response']['outputUrl'],
+        )
         return result
 
 
@@ -179,6 +190,7 @@ class CloudDatastoreImportEntitiesOperator(BaseOperator):
         'labels',
         'impersonation_chain',
     )
+    operator_extra_links = (CloudDatastoreImportExportLink(),)
 
     def __init__(
         self,
@@ -231,6 +243,7 @@ class CloudDatastoreImportEntitiesOperator(BaseOperator):
         if state != 'SUCCESSFUL':
             raise AirflowException(f'Operation failed: result={result}')
 
+        CloudDatastoreImportExportLink.persist(context=context, task_instance=self)
         return result
 
 
@@ -265,6 +278,7 @@ class CloudDatastoreAllocateIdsOperator(BaseOperator):
         "partial_keys",
         "impersonation_chain",
     )
+    operator_extra_links = (CloudDatastoreEntitiesLink(),)
 
     def __init__(
         self,
@@ -293,6 +307,7 @@ class CloudDatastoreAllocateIdsOperator(BaseOperator):
             partial_keys=self.partial_keys,
             project_id=self.project_id,
         )
+        CloudDatastoreEntitiesLink.persist(context=context, task_instance=self)
         return keys
 
 
@@ -389,6 +404,7 @@ class CloudDatastoreCommitOperator(BaseOperator):
         "body",
         "impersonation_chain",
     )
+    operator_extra_links = (CloudDatastoreEntitiesLink(),)
 
     def __init__(
         self,
@@ -417,6 +433,7 @@ class CloudDatastoreCommitOperator(BaseOperator):
             body=self.body,
             project_id=self.project_id,
         )
+        CloudDatastoreEntitiesLink.persist(context=context, task_instance=self)
         return response
 
 
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 514b4432db..4ef112ead9 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -904,6 +904,9 @@ extra-links:
   - airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentLink
   - airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentsLink
   - airflow.providers.google.cloud.links.dataflow.DataflowJobLink
+  - airflow.providers.google.cloud.links.datastore.CloudDatastoreImportExportLink
+  - airflow.providers.google.cloud.links.datastore.CloudDatastoreExportEntitiesLink
+  - airflow.providers.google.cloud.links.datastore.CloudDatastoreEntitiesLink
   - airflow.providers.google.common.links.storage.StorageLink
 
 additional-extras:


[airflow] 04/04: Update unit tests for Datastore operators

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d9e7b6a940e3a5a84eee1c97ca32ef5fd4d22f42
Author: MaksYermak <ma...@gmail.com>
AuthorDate: Wed Apr 20 13:07:17 2022 +0200

    Update unit tests for Datastore operators
---
 tests/providers/google/cloud/operators/test_datastore.py | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/tests/providers/google/cloud/operators/test_datastore.py b/tests/providers/google/cloud/operators/test_datastore.py
index ddc59a5135..75f3d2ac94 100644
--- a/tests/providers/google/cloud/operators/test_datastore.py
+++ b/tests/providers/google/cloud/operators/test_datastore.py
@@ -35,6 +35,7 @@ CONN_ID = "test-gcp-conn-id"
 BODY = {"key", "value"}
 TRANSACTION = "transaction-name"
 BUCKET = "gs://test-bucket"
+OUTPUT_URL = f"{BUCKET}/entities_export_name/entities_export_name.overall_export_metadata"
 FILE = "filename"
 OPERATION_ID = "1234"
 
@@ -44,7 +45,10 @@ class TestCloudDatastoreExportEntitiesOperator:
     def test_execute(self, mock_hook):
         mock_hook.return_value.export_to_storage_bucket.return_value = {"name": OPERATION_ID}
         mock_hook.return_value.poll_operation_until_done.return_value = {
-            "metadata": {"common": {"state": "SUCCESSFUL"}}
+            "metadata": {"common": {"state": "SUCCESSFUL"}},
+            "response": {
+                "outputUrl": OUTPUT_URL,
+            },
         }
 
         op = CloudDatastoreExportEntitiesOperator(
@@ -53,7 +57,7 @@ class TestCloudDatastoreExportEntitiesOperator:
             project_id=PROJECT_ID,
             bucket=BUCKET,
         )
-        op.execute({})
+        op.execute(context={'ti': mock.MagicMock()})
 
         mock_hook.assert_called_once_with(CONN_ID, None, impersonation_chain=None)
         mock_hook.return_value.export_to_storage_bucket.assert_called_once_with(
@@ -82,7 +86,7 @@ class TestCloudDatastoreImportEntitiesOperator:
             bucket=BUCKET,
             file=FILE,
         )
-        op.execute({})
+        op.execute(context={'ti': mock.MagicMock()})
 
         mock_hook.assert_called_once_with(CONN_ID, None, impersonation_chain=None)
         mock_hook.return_value.import_from_storage_bucket.assert_called_once_with(
@@ -107,7 +111,7 @@ class TestCloudDatastoreAllocateIds:
             project_id=PROJECT_ID,
             partial_keys=partial_keys,
         )
-        op.execute({})
+        op.execute(context={'ti': mock.MagicMock()})
 
         mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID, impersonation_chain=None)
         mock_hook.return_value.allocate_ids.assert_called_once_with(
@@ -138,7 +142,7 @@ class TestCloudDatastoreCommit:
         op = CloudDatastoreCommitOperator(
             task_id="test_task", gcp_conn_id=CONN_ID, project_id=PROJECT_ID, body=BODY
         )
-        op.execute({})
+        op.execute(context={'ti': mock.MagicMock()})
 
         mock_hook.assert_called_once_with(gcp_conn_id=CONN_ID, impersonation_chain=None)
         mock_hook.return_value.commit.assert_called_once_with(project_id=PROJECT_ID, body=BODY)