You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/13 08:34:17 UTC

[GitHub] [airflow] feluelle commented on a change in pull request #13072: AWS Glue Crawler Integration

feluelle commented on a change in pull request #13072:
URL: https://github.com/apache/airflow/pull/13072#discussion_r553260972



##########
File path: airflow/providers/amazon/aws/hooks/glue_crawler.py
##########
@@ -0,0 +1,169 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+    """
+    Interacts with AWS Glue Crawler
+    :param config = Configurations for the AWS Glue crawler
+    :type config = dict

Review comment:
       ```suggestion
   ```
   ?

##########
File path: airflow/providers/amazon/aws/hooks/glue_crawler.py
##########
@@ -0,0 +1,169 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+    """
+    Interacts with AWS Glue Crawler
+    :param config = Configurations for the AWS Glue crawler
+    :type config = dict
+    """
+
+    def __init__(self, *args, **kwargs):
+        kwargs['client_type'] = 'glue'
+        super().__init__(*args, **kwargs)
+
+    @cached_property
+    def glue_client(self):
+        """:return: AWS Glue client"""
+        return self.get_conn()
+
+    def check_iam_role(self, role_name: str) -> str:
+        """
+        Checks if the input IAM role name is a
+        valid pre-existing role within the caller's AWS account.
+        Is needed because the current Boto3 (<=1.16.46)
+        glue client create_crawler() method misleadingly catches
+        non-existing role as a role trust policy error.
+        :param role_name = IAM role name

Review comment:
       ```suggestion
           non-existing role as a role trust policy error.
   
           :param role_name = IAM role name
   ```
   Please put an empty line between description and params.

##########
File path: airflow/providers/amazon/aws/hooks/glue_crawler.py
##########
@@ -0,0 +1,169 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+    """
+    Interacts with AWS Glue Crawler
+    :param config = Configurations for the AWS Glue crawler
+    :type config = dict
+    """
+
+    def __init__(self, *args, **kwargs):
+        kwargs['client_type'] = 'glue'
+        super().__init__(*args, **kwargs)
+
+    @cached_property
+    def glue_client(self):
+        """:return: AWS Glue client"""
+        return self.get_conn()
+
+    def check_iam_role(self, role_name: str) -> str:
+        """
+        Checks if the input IAM role name is a
+        valid pre-existing role within the caller's AWS account.
+        Is needed because the current Boto3 (<=1.16.46)
+        glue client create_crawler() method misleadingly catches
+        non-existing role as a role trust policy error.
+        :param role_name = IAM role name

Review comment:
       And check this on the rest of your code, please. Thanks :)

##########
File path: airflow/providers/amazon/aws/hooks/glue_crawler.py
##########
@@ -0,0 +1,169 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+    """
+    Interacts with AWS Glue Crawler
+    :param config = Configurations for the AWS Glue crawler
+    :type config = dict
+    """
+
+    def __init__(self, *args, **kwargs):
+        kwargs['client_type'] = 'glue'
+        super().__init__(*args, **kwargs)
+
+    @cached_property
+    def glue_client(self):
+        """:return: AWS Glue client"""
+        return self.get_conn()
+
+    def check_iam_role(self, role_name: str) -> str:
+        """
+        Checks if the input IAM role name is a
+        valid pre-existing role within the caller's AWS account.
+        Is needed because the current Boto3 (<=1.16.46)
+        glue client create_crawler() method misleadingly catches
+        non-existing role as a role trust policy error.
+        :param role_name = IAM role name
+        :type role_name = str
+        :return: IAM role name
+        """
+        iam_client = self.get_client_type('iam', self.region_name)
+
+        iam_client.get_role(RoleName=role_name)
+
+    def get_or_create_crawler(self, **crawler_kwargs) -> str:
+        """
+        Creates the crawler if the crawler doesn't exists and returns the crawler name
+
+        :param crawler_kwargs = Keyword args that define the configurations used to create/update the crawler
+        :type crawler_kwargs = any
+        :return: Name of the crawler
+        """
+        crawler_name = crawler_kwargs['Name']
+        try:
+            glue_response = self.glue_client.get_crawler(Name=crawler_name)
+            self.log.info("Crawler %s already exists; updating crawler", crawler_name)
+            self.glue_client.update_crawler(**crawler_kwargs)
+            return glue_response['Crawler']['Name']
+        except self.glue_client.exceptions.EntityNotFoundException:
+            self.log.info("Creating AWS Glue crawler %s", crawler_name)
+            try:
+                glue_response = self.glue_client.create_crawler(**crawler_kwargs)
+                return glue_response['Crawler']['Name']
+            except self.glue_client.exceptions.InvalidInputException as general_error:
+                self.check_iam_role(crawler_kwargs['Role'])
+                raise AirflowException(general_error)

Review comment:
       Can you maybe move this into a separate `create_crawler` function?
   
   I would also move the get-or-create logic on the operator level. So that in the execute you would:
   ```python
   if not self.hook.has_crawler(**self.config):
       self.hook.create_crawler(**self.config)
   crawler_name = self.hook.get_crawler(**self.config)
   ```
   where `has_crawler` gets the crawler and handles the exception in case it does not exist i.e. returns False.
   
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org