You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/12/23 18:23:43 UTC

[GitHub] [airflow] dstandish commented on a change in pull request #13072: AWS Glue Crawler Integration

dstandish commented on a change in pull request #13072:
URL: https://github.com/apache/airflow/pull/13072#discussion_r548085121



##########
File path: airflow/providers/amazon/aws/hooks/glue_crawler.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+from typing import Dict, List
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+    """
+    Interacts with AWS Glue Crawler
+    :param crawler_name = Unique crawler name per AWS Account

Review comment:
       generally i think hook init params are focused on authentication and connection behavior
   
   i think it might make sense to move most of these kwargs to the `get_or_create_crawler` method
   
   sort of like how with s3hook you don't provide key and bucket at init, or with sql hook you don't provide sql at init -- but you provide methods for interacting with the service.
   
   then you could instantiate a hook once and use it to deal with multiple crawlers.  
   
   then, re your slack question about too many params,  perhaps the method becomes `get_or_create_crawler(**crawler_kwargs)`
   

##########
File path: airflow/providers/amazon/aws/hooks/glue_crawler.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+from typing import Dict, List
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+    """
+    Interacts with AWS Glue Crawler
+    :param crawler_name = Unique crawler name per AWS Account
+    :type crawler_name = Optional[str]
+    :param crawler_desc = Crawler description
+    :type crawler_desc = Optional[str]
+    :param glue_db_name = AWS glue catalog database ID
+    :type glue_db_name = Optional[str]
+    :param iam_role_name = AWS IAM role for glue crawler
+    :type iam_role_name = Optional[str]
+    :param region_name = AWS region name (e.g. 'us-west-2')
+    :type region_name = Optional[str]
+    :param s3_target_configuration = Configurations for crawling AWS S3 paths
+    :type s3_target_configuration = Optional[list]
+    :param jdbc_target_configuration = Configurations for crawling JDBC paths
+    :type jdbc_target_configuration = Optional[list]
+    :param mongo_target_configuration = Configurations for crawling AWS DocumentDB or MongoDB
+    :type mongo_target_configuration = Optional[list]
+    :param dynamo_target_configuration = Configurations for crawling AWS DynamoDB
+    :type dynamo_target_configuration = Optional[list]
+    :param glue_catalog_target_configuration = Configurations for crawling AWS Glue CatalogDB
+    :type glue_catalog_target_configuration = Optional[list]
+    :param cron_schedule = Cron expression used to define the crawler schedule (e.g. cron(11 18 * ? * *))
+    :type cron_schedule = Optional[str]
+    :param classifiers = List of user defined custom classifiers to be used by the crawler
+    :type classifiers = Optional[list]
+    :param table_prefix = Prefix for catalog table to be created
+    :type table_prefix = Optional[str]
+    :param update_behavior = Behavior when the crawler identifies schema changes
+    :type update_behavior = Optional[str]
+    :param delete_behavior = Behavior when the crawler identifies deleted objects
+    :type delete_behavior = Optional[str]
+    :param recrawl_behavior = Behavior when the crawler needs to crawl again
+    :type recrawl_behavior = Optional[str]
+    :param lineage_settings = Enables or disables data lineage
+    :type lineage_settings = Optional[str]
+    :param json_configuration = Versioned JSON configuration for the crawler
+    :type json_configuration = Optional[str]
+    :param security_configuration = Name of the security configuration structure to be used by the crawler.
+    :type security_configuration = Optional[str]
+    :param tags = Tags to attach to the crawler request
+    :type tags = Optional[dict]
+    :param overwrite = Determines if crawler should be updated if the crawler configuration change
+    :type overwrite = Optional[bool]
+    """
+
+    CRAWLER_POLL_INTERVAL = 6  # polls crawler status after every CRAWLER_POLL_INTERVAL seconds
+
+    def __init__(
+        self,
+        crawler_name=None,
+        crawler_desc=None,
+        glue_db_name=None,
+        iam_role_name=None,
+        s3_targets_configuration=None,
+        jdbc_targets_configuration=None,
+        mongo_targets_configuration=None,
+        dynamo_targets_configuration=None,
+        glue_catalog_targets_configuration=None,
+        cron_schedule=None,
+        classifiers=None,
+        table_prefix=None,
+        update_behavior=None,
+        delete_behavior=None,
+        recrawl_behavior=None,
+        lineage_settings=None,
+        json_configuration=None,
+        security_configuration=None,
+        tags=None,
+        overwrite=False,
+        *args,
+        **kwargs,
+    ):
+
+        self.crawler_name = crawler_name
+        self.crawler_desc = crawler_desc
+        self.glue_db_name = glue_db_name
+        self.iam_role_name = iam_role_name
+        self.s3_targets_configuration = s3_targets_configuration
+        self.jdbc_targets_configuration = jdbc_targets_configuration
+        self.mongo_targets_configuration = mongo_targets_configuration
+        self.dynamo_targets_configuration = dynamo_targets_configuration
+        self.glue_catalog_targets_configuration = glue_catalog_targets_configuration
+        self.cron_schedule = cron_schedule
+        self.classifiers = classifiers
+        self.table_prefix = table_prefix
+        self.update_behavior = update_behavior
+        self.delete_behavior = delete_behavior
+        self.recrawl_behavior = recrawl_behavior
+        self.lineage_settings = lineage_settings
+        self.json_configuration = json_configuration
+        self.security_configuration = security_configuration
+        self.tags = tags
+        self.overwrite = overwrite
+        kwargs['client_type'] = 'glue'
+        super().__init__(*args, **kwargs)
+
+    def list_crawlers(self) -> List:
+        """:return: Lists of Crawlers"""
+        conn = self.get_conn()
+        return conn.get_crawlers()
+
+    def get_iam_execution_role(self) -> Dict:
+        """:return: iam role for crawler execution"""
+        iam_client = self.get_client_type('iam', self.region_name)
+
+        try:
+            glue_execution_role = iam_client.get_role(RoleName=self.iam_role_name)
+            self.log.info("Iam Role Name: %s", self.iam_role_name)
+            return glue_execution_role
+        except Exception as general_error:
+            self.log.error("Failed to create aws glue crawler, error: %s", general_error)
+            raise
+
+    def initialize_crawler(self):
+        """
+        Initializes connection with AWS Glue to run crawler
+        :return:
+        """
+        glue_client = self.get_conn()

Review comment:
       one thing i like to do in hooks is create a cached propert such as `client` or `glue_client` or in other hooks `session`, so that you don't need to remember to call `get_conn` in every case, and you can just reuse the same authenticated object without reauthenticating.
   
   e.g. 
   
   ```python
   @cached_property
   def glue_client(self):
       return self.get_conn()
   ```

##########
File path: airflow/providers/amazon/aws/hooks/glue_crawler.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+from typing import Dict, List
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+    """
+    Interacts with AWS Glue Crawler
+    :param crawler_name = Unique crawler name per AWS Account
+    :type crawler_name = Optional[str]
+    :param crawler_desc = Crawler description
+    :type crawler_desc = Optional[str]
+    :param glue_db_name = AWS glue catalog database ID
+    :type glue_db_name = Optional[str]
+    :param iam_role_name = AWS IAM role for glue crawler
+    :type iam_role_name = Optional[str]
+    :param region_name = AWS region name (e.g. 'us-west-2')
+    :type region_name = Optional[str]
+    :param s3_target_configuration = Configurations for crawling AWS S3 paths
+    :type s3_target_configuration = Optional[list]
+    :param jdbc_target_configuration = Configurations for crawling JDBC paths
+    :type jdbc_target_configuration = Optional[list]
+    :param mongo_target_configuration = Configurations for crawling AWS DocumentDB or MongoDB
+    :type mongo_target_configuration = Optional[list]
+    :param dynamo_target_configuration = Configurations for crawling AWS DynamoDB
+    :type dynamo_target_configuration = Optional[list]
+    :param glue_catalog_target_configuration = Configurations for crawling AWS Glue CatalogDB
+    :type glue_catalog_target_configuration = Optional[list]
+    :param cron_schedule = Cron expression used to define the crawler schedule (e.g. cron(11 18 * ? * *))
+    :type cron_schedule = Optional[str]
+    :param classifiers = List of user defined custom classifiers to be used by the crawler
+    :type classifiers = Optional[list]
+    :param table_prefix = Prefix for catalog table to be created
+    :type table_prefix = Optional[str]
+    :param update_behavior = Behavior when the crawler identifies schema changes
+    :type update_behavior = Optional[str]
+    :param delete_behavior = Behavior when the crawler identifies deleted objects
+    :type delete_behavior = Optional[str]
+    :param recrawl_behavior = Behavior when the crawler needs to crawl again
+    :type recrawl_behavior = Optional[str]
+    :param lineage_settings = Enables or disables data lineage
+    :type lineage_settings = Optional[str]
+    :param json_configuration = Versioned JSON configuration for the crawler
+    :type json_configuration = Optional[str]
+    :param security_configuration = Name of the security configuration structure to be used by the crawler.
+    :type security_configuration = Optional[str]
+    :param tags = Tags to attach to the crawler request
+    :type tags = Optional[dict]
+    :param overwrite = Determines if crawler should be updated if the crawler configuration change
+    :type overwrite = Optional[bool]
+    """
+
+    CRAWLER_POLL_INTERVAL = 6  # polls crawler status after every CRAWLER_POLL_INTERVAL seconds
+
+    def __init__(
+        self,
+        crawler_name=None,
+        crawler_desc=None,
+        glue_db_name=None,
+        iam_role_name=None,
+        s3_targets_configuration=None,
+        jdbc_targets_configuration=None,
+        mongo_targets_configuration=None,
+        dynamo_targets_configuration=None,
+        glue_catalog_targets_configuration=None,
+        cron_schedule=None,
+        classifiers=None,
+        table_prefix=None,
+        update_behavior=None,
+        delete_behavior=None,
+        recrawl_behavior=None,
+        lineage_settings=None,
+        json_configuration=None,
+        security_configuration=None,
+        tags=None,
+        overwrite=False,
+        *args,
+        **kwargs,
+    ):
+
+        self.crawler_name = crawler_name
+        self.crawler_desc = crawler_desc
+        self.glue_db_name = glue_db_name
+        self.iam_role_name = iam_role_name
+        self.s3_targets_configuration = s3_targets_configuration
+        self.jdbc_targets_configuration = jdbc_targets_configuration
+        self.mongo_targets_configuration = mongo_targets_configuration
+        self.dynamo_targets_configuration = dynamo_targets_configuration
+        self.glue_catalog_targets_configuration = glue_catalog_targets_configuration
+        self.cron_schedule = cron_schedule
+        self.classifiers = classifiers
+        self.table_prefix = table_prefix
+        self.update_behavior = update_behavior
+        self.delete_behavior = delete_behavior
+        self.recrawl_behavior = recrawl_behavior
+        self.lineage_settings = lineage_settings
+        self.json_configuration = json_configuration
+        self.security_configuration = security_configuration
+        self.tags = tags
+        self.overwrite = overwrite
+        kwargs['client_type'] = 'glue'
+        super().__init__(*args, **kwargs)
+
+    def list_crawlers(self) -> List:
+        """:return: Lists of Crawlers"""
+        conn = self.get_conn()
+        return conn.get_crawlers()
+
+    def get_iam_execution_role(self) -> Dict:
+        """:return: iam role for crawler execution"""
+        iam_client = self.get_client_type('iam', self.region_name)
+
+        try:
+            glue_execution_role = iam_client.get_role(RoleName=self.iam_role_name)
+            self.log.info("Iam Role Name: %s", self.iam_role_name)
+            return glue_execution_role
+        except Exception as general_error:
+            self.log.error("Failed to create aws glue crawler, error: %s", general_error)
+            raise
+
+    def initialize_crawler(self):

Review comment:
       i might consider chopping this method because initialize is a bit confusing and it doesn't really do much.
   
   then in your operator execute you would just call
   ```python
   crawler_name = hook.get_or_create_glue_crawler(**crawler_kwargs)
   hook.start_crawler(crawler_name)
   hook.await_crawler(crawler_name)
   ```
   which i think is a tad clearer
   

##########
File path: airflow/providers/amazon/aws/operators/glue_crawler.py
##########
@@ -0,0 +1,164 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AwsGlueCrawlerOperator(BaseOperator):
+    """
+    Creates an AWS Glue Crawler. AWS Glue Crawler is a serverless
+    service for infering the schema, format and data type of data store on the AWS cloud.
+    Language support: Python
+    :param crawler_name = Unique crawler name per AWS Account
+    :type crawler_name = Optional[str]
+    :param crawler_desc = Crawler description
+    :type crawler_desc = Optional[str]
+    :param glue_db_name = AWS glue catalog database ID
+    :type glue_db_name = Optional[str]
+    :param iam_role_name = AWS IAM role for glue crawler
+    :type iam_role_name = Optional[str]
+    :param region_name = AWS region name (e.g. 'us-west-2')
+    :type region_name = Optional[str]
+    :param s3_targets_configuration = Configurations for crawling AWS S3 paths
+    :type s3_targets_configuration = Optional[list]
+    :param jdbc_targets_configuration = Configurations for crawling JDBC paths
+    :type jdbc_targets_configuration = Optional[list]
+    :param mongo_targets_configuration = Configurations for crawling AWS DocumentDB or MongoDB
+    :type mongo_targets_configuration = Optional[list]
+    :param dynamo_targets_configuration = Configurations for crawling AWS DynamoDB
+    :type dynamo_targets_configuration = Optional[list]
+    :param glue_catalog_targets_configuration = Configurations for crawling AWS Glue CatalogDB
+    :type glue_catalog_targets_configuration = Optional[list]
+    :param cron_schedule = Cron expression used to define the crawler schedule (e.g. cron(11 18 * ? * *))
+    :type cron_schedule = Optional[str]
+    :param classifiers = List of user defined custom classifiers to be used by the crawler
+    :type classifiers = Optional[list]
+    :param table_prefix = Prefix for catalog table to be created
+    :type table_prefix = Optional[str]
+    :param update_behavior = Behavior when the crawler identifies schema changes
+    :type update_behavior = Optional[str]
+    :param delete_behavior = Behavior when the crawler identifies deleted objects
+    :type delete_behavior = Optional[str]
+    :param recrawl_behavior = Behavior when the crawler needs to crawl again
+    :type recrawl_behavior = Optional[str]
+    :param lineage_settings = Enables or disables data lineage
+    :type lineage_settings = Optional[str]
+    :param json_configuration = Versioned JSON configuration for the crawler
+    :type json_configuration = Optional[str]
+    :param security_configuration = Name of the security configuration structure to be used by the crawler.
+    :type security_configuration = Optional[str]
+    :param tags = Tags to attach to the crawler request
+    :type tags = Optional[dict]
+    :param overwrite = Determines if crawler should be updated if the crawler configuration change
+    :type overwrite = Optional[bool]
+    """
+
+    template_fields = ()
+    template_ext = ()
+    ui_color = '#ededed'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        crawler_name='aws_glue_default_crawler',
+        crawler_desc='AWS Glue Crawler with Airflow',
+        aws_conn_id='aws_default',
+        glue_db_name=None,
+        iam_role_name=None,
+        region_name=None,
+        s3_targets_configuration=None,
+        jdbc_targets_configuration=None,
+        mongo_targets_configuration=None,
+        dynamo_targets_configuration=None,
+        glue_catalog_targets_configuration=None,
+        cron_schedule=None,
+        classifiers=None,
+        table_prefix=None,

Review comment:
       in an operator, it does make sense to have the config as init params.
   but here i'd suggest consoldating to the more future-proof and simpler `crawler_kwargs`, a dict param

##########
File path: airflow/providers/amazon/aws/hooks/glue_crawler.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+from typing import Dict, List
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+    """
+    Interacts with AWS Glue Crawler
+    :param crawler_name = Unique crawler name per AWS Account
+    :type crawler_name = Optional[str]
+    :param crawler_desc = Crawler description
+    :type crawler_desc = Optional[str]
+    :param glue_db_name = AWS glue catalog database ID
+    :type glue_db_name = Optional[str]
+    :param iam_role_name = AWS IAM role for glue crawler
+    :type iam_role_name = Optional[str]
+    :param region_name = AWS region name (e.g. 'us-west-2')
+    :type region_name = Optional[str]
+    :param s3_target_configuration = Configurations for crawling AWS S3 paths
+    :type s3_target_configuration = Optional[list]
+    :param jdbc_target_configuration = Configurations for crawling JDBC paths
+    :type jdbc_target_configuration = Optional[list]
+    :param mongo_target_configuration = Configurations for crawling AWS DocumentDB or MongoDB
+    :type mongo_target_configuration = Optional[list]
+    :param dynamo_target_configuration = Configurations for crawling AWS DynamoDB
+    :type dynamo_target_configuration = Optional[list]
+    :param glue_catalog_target_configuration = Configurations for crawling AWS Glue CatalogDB
+    :type glue_catalog_target_configuration = Optional[list]
+    :param cron_schedule = Cron expression used to define the crawler schedule (e.g. cron(11 18 * ? * *))
+    :type cron_schedule = Optional[str]
+    :param classifiers = List of user defined custom classifiers to be used by the crawler
+    :type classifiers = Optional[list]
+    :param table_prefix = Prefix for catalog table to be created
+    :type table_prefix = Optional[str]
+    :param update_behavior = Behavior when the crawler identifies schema changes
+    :type update_behavior = Optional[str]
+    :param delete_behavior = Behavior when the crawler identifies deleted objects
+    :type delete_behavior = Optional[str]
+    :param recrawl_behavior = Behavior when the crawler needs to crawl again
+    :type recrawl_behavior = Optional[str]
+    :param lineage_settings = Enables or disables data lineage
+    :type lineage_settings = Optional[str]
+    :param json_configuration = Versioned JSON configuration for the crawler
+    :type json_configuration = Optional[str]
+    :param security_configuration = Name of the security configuration structure to be used by the crawler.
+    :type security_configuration = Optional[str]
+    :param tags = Tags to attach to the crawler request
+    :type tags = Optional[dict]
+    :param overwrite = Determines if crawler should be updated if the crawler configuration change
+    :type overwrite = Optional[bool]
+    """
+
+    CRAWLER_POLL_INTERVAL = 6  # polls crawler status after every CRAWLER_POLL_INTERVAL seconds
+
+    def __init__(
+        self,
+        crawler_name=None,
+        crawler_desc=None,
+        glue_db_name=None,
+        iam_role_name=None,
+        s3_targets_configuration=None,
+        jdbc_targets_configuration=None,
+        mongo_targets_configuration=None,
+        dynamo_targets_configuration=None,
+        glue_catalog_targets_configuration=None,
+        cron_schedule=None,
+        classifiers=None,
+        table_prefix=None,
+        update_behavior=None,
+        delete_behavior=None,
+        recrawl_behavior=None,
+        lineage_settings=None,
+        json_configuration=None,
+        security_configuration=None,
+        tags=None,
+        overwrite=False,
+        *args,
+        **kwargs,
+    ):
+
+        self.crawler_name = crawler_name
+        self.crawler_desc = crawler_desc
+        self.glue_db_name = glue_db_name
+        self.iam_role_name = iam_role_name
+        self.s3_targets_configuration = s3_targets_configuration
+        self.jdbc_targets_configuration = jdbc_targets_configuration
+        self.mongo_targets_configuration = mongo_targets_configuration
+        self.dynamo_targets_configuration = dynamo_targets_configuration
+        self.glue_catalog_targets_configuration = glue_catalog_targets_configuration
+        self.cron_schedule = cron_schedule
+        self.classifiers = classifiers
+        self.table_prefix = table_prefix
+        self.update_behavior = update_behavior
+        self.delete_behavior = delete_behavior
+        self.recrawl_behavior = recrawl_behavior
+        self.lineage_settings = lineage_settings
+        self.json_configuration = json_configuration
+        self.security_configuration = security_configuration
+        self.tags = tags
+        self.overwrite = overwrite
+        kwargs['client_type'] = 'glue'
+        super().__init__(*args, **kwargs)
+
+    def list_crawlers(self) -> List:
+        """:return: Lists of Crawlers"""
+        conn = self.get_conn()
+        return conn.get_crawlers()
+
+    def get_iam_execution_role(self) -> Dict:
+        """:return: iam role for crawler execution"""
+        iam_client = self.get_client_type('iam', self.region_name)
+
+        try:
+            glue_execution_role = iam_client.get_role(RoleName=self.iam_role_name)
+            self.log.info("Iam Role Name: %s", self.iam_role_name)
+            return glue_execution_role
+        except Exception as general_error:
+            self.log.error("Failed to create aws glue crawler, error: %s", general_error)
+            raise
+
+    def initialize_crawler(self):
+        """
+        Initializes connection with AWS Glue to run crawler
+        :return:
+        """
+        glue_client = self.get_conn()
+
+        try:
+            crawler_name = self.get_or_create_glue_crawler()
+            crawler_run = glue_client.start_crawler(Name=crawler_name)
+            return crawler_run
+        except Exception as general_error:
+            self.log.error("Failed to run aws glue crawler, error: %s", general_error)
+            raise
+
+    def get_crawler_state(self, crawler_name: str) -> str:
+        """
+        Get state of the Glue crawler. The crawler state can be
+        ready, running, or stopping.
+        :param crawler_name: unique crawler name per AWS account
+        :type crawler_name: str
+        :return: State of the Glue crawler
+        """
+        glue_client = self.get_conn()
+        crawler_run = glue_client.get_crawler(Name=crawler_name)
+        crawler_run_state = crawler_run['Crawler']['State']
+        return crawler_run_state
+
+    def get_crawler_status(self, crawler_name: str) -> str:

Review comment:
       `get_crawl_status` -- it's the status of the crawl i.e. the run not the state of the crawler
   
   this would help clarify diff between `get_crawler_status` and `get_crawler_state` which sound like the same thing
   
   you could even do `get_last_crawl_status` to be explicit

##########
File path: airflow/providers/amazon/aws/hooks/glue_crawler.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+from typing import Dict, List
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+    """
+    Interacts with AWS Glue Crawler
+    :param crawler_name = Unique crawler name per AWS Account
+    :type crawler_name = Optional[str]
+    :param crawler_desc = Crawler description
+    :type crawler_desc = Optional[str]
+    :param glue_db_name = AWS glue catalog database ID
+    :type glue_db_name = Optional[str]
+    :param iam_role_name = AWS IAM role for glue crawler
+    :type iam_role_name = Optional[str]
+    :param region_name = AWS region name (e.g. 'us-west-2')
+    :type region_name = Optional[str]
+    :param s3_target_configuration = Configurations for crawling AWS S3 paths
+    :type s3_target_configuration = Optional[list]
+    :param jdbc_target_configuration = Configurations for crawling JDBC paths
+    :type jdbc_target_configuration = Optional[list]
+    :param mongo_target_configuration = Configurations for crawling AWS DocumentDB or MongoDB
+    :type mongo_target_configuration = Optional[list]
+    :param dynamo_target_configuration = Configurations for crawling AWS DynamoDB
+    :type dynamo_target_configuration = Optional[list]
+    :param glue_catalog_target_configuration = Configurations for crawling AWS Glue CatalogDB
+    :type glue_catalog_target_configuration = Optional[list]
+    :param cron_schedule = Cron expression used to define the crawler schedule (e.g. cron(11 18 * ? * *))
+    :type cron_schedule = Optional[str]
+    :param classifiers = List of user defined custom classifiers to be used by the crawler
+    :type classifiers = Optional[list]
+    :param table_prefix = Prefix for catalog table to be created
+    :type table_prefix = Optional[str]
+    :param update_behavior = Behavior when the crawler identifies schema changes
+    :type update_behavior = Optional[str]
+    :param delete_behavior = Behavior when the crawler identifies deleted objects
+    :type delete_behavior = Optional[str]
+    :param recrawl_behavior = Behavior when the crawler needs to crawl again
+    :type recrawl_behavior = Optional[str]
+    :param lineage_settings = Enables or disables data lineage
+    :type lineage_settings = Optional[str]
+    :param json_configuration = Versioned JSON configuration for the crawler
+    :type json_configuration = Optional[str]
+    :param security_configuration = Name of the security configuration structure to be used by the crawler.
+    :type security_configuration = Optional[str]
+    :param tags = Tags to attach to the crawler request
+    :type tags = Optional[dict]
+    :param overwrite = Determines if crawler should be updated if the crawler configuration change
+    :type overwrite = Optional[bool]
+    """
+
+    CRAWLER_POLL_INTERVAL = 6  # polls crawler status after every CRAWLER_POLL_INTERVAL seconds
+
+    def __init__(
+        self,
+        crawler_name=None,
+        crawler_desc=None,
+        glue_db_name=None,
+        iam_role_name=None,
+        s3_targets_configuration=None,
+        jdbc_targets_configuration=None,
+        mongo_targets_configuration=None,
+        dynamo_targets_configuration=None,
+        glue_catalog_targets_configuration=None,
+        cron_schedule=None,
+        classifiers=None,
+        table_prefix=None,
+        update_behavior=None,
+        delete_behavior=None,
+        recrawl_behavior=None,
+        lineage_settings=None,
+        json_configuration=None,
+        security_configuration=None,
+        tags=None,
+        overwrite=False,
+        *args,
+        **kwargs,
+    ):
+
+        self.crawler_name = crawler_name
+        self.crawler_desc = crawler_desc
+        self.glue_db_name = glue_db_name
+        self.iam_role_name = iam_role_name
+        self.s3_targets_configuration = s3_targets_configuration
+        self.jdbc_targets_configuration = jdbc_targets_configuration
+        self.mongo_targets_configuration = mongo_targets_configuration
+        self.dynamo_targets_configuration = dynamo_targets_configuration
+        self.glue_catalog_targets_configuration = glue_catalog_targets_configuration
+        self.cron_schedule = cron_schedule
+        self.classifiers = classifiers
+        self.table_prefix = table_prefix
+        self.update_behavior = update_behavior
+        self.delete_behavior = delete_behavior
+        self.recrawl_behavior = recrawl_behavior
+        self.lineage_settings = lineage_settings
+        self.json_configuration = json_configuration
+        self.security_configuration = security_configuration
+        self.tags = tags
+        self.overwrite = overwrite
+        kwargs['client_type'] = 'glue'
+        super().__init__(*args, **kwargs)
+
+    def list_crawlers(self) -> List:
+        """:return: Lists of Crawlers"""
+        conn = self.get_conn()
+        return conn.get_crawlers()
+
+    def get_iam_execution_role(self) -> Dict:
+        """:return: iam role for crawler execution"""
+        iam_client = self.get_client_type('iam', self.region_name)
+
+        try:
+            glue_execution_role = iam_client.get_role(RoleName=self.iam_role_name)
+            self.log.info("Iam Role Name: %s", self.iam_role_name)
+            return glue_execution_role
+        except Exception as general_error:
+            self.log.error("Failed to create aws glue crawler, error: %s", general_error)
+            raise
+
+    def initialize_crawler(self):
+        """
+        Initializes connection with AWS Glue to run crawler
+        :return:
+        """
+        glue_client = self.get_conn()
+
+        try:
+            crawler_name = self.get_or_create_glue_crawler()
+            crawler_run = glue_client.start_crawler(Name=crawler_name)
+            return crawler_run
+        except Exception as general_error:
+            self.log.error("Failed to run aws glue crawler, error: %s", general_error)
+            raise
+
+    def get_crawler_state(self, crawler_name: str) -> str:
+        """
+        Get state of the Glue crawler. The crawler state can be
+        ready, running, or stopping.
+        :param crawler_name: unique crawler name per AWS account
+        :type crawler_name: str
+        :return: State of the Glue crawler
+        """
+        glue_client = self.get_conn()
+        crawler_run = glue_client.get_crawler(Name=crawler_name)
+        crawler_run_state = crawler_run['Crawler']['State']
+        return crawler_run_state
+
+    def get_crawler_status(self, crawler_name: str) -> str:
+        """
+        Get current status of the Glue crawler. The crawler
+        status can be succeeded, cancelled, or failed.
+        :param crawler_name: unique crawler name per AWS account
+        :type crawler_name: str
+        :return: Status of the Glue crawler
+        """
+        glue_client = self.get_conn()
+        crawler_run = glue_client.get_crawler(Name=crawler_name)
+        crawler_run_status = crawler_run['Crawler']['LastCrawl']['Status']
+        return crawler_run_status
+
+    def crawler_completion(self, crawler_name: str) -> str:

Review comment:
       maybe `wait_for_crawler_completion` or `await_crawler_completion` or `await_completion` or `wait_for_completion` ---- something to indicated what it will do

##########
File path: airflow/providers/amazon/aws/hooks/glue_crawler.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+from typing import Dict, List
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+    """
+    Interacts with AWS Glue Crawler
+    :param crawler_name = Unique crawler name per AWS Account
+    :type crawler_name = Optional[str]
+    :param crawler_desc = Crawler description
+    :type crawler_desc = Optional[str]
+    :param glue_db_name = AWS glue catalog database ID
+    :type glue_db_name = Optional[str]
+    :param iam_role_name = AWS IAM role for glue crawler
+    :type iam_role_name = Optional[str]
+    :param region_name = AWS region name (e.g. 'us-west-2')
+    :type region_name = Optional[str]
+    :param s3_target_configuration = Configurations for crawling AWS S3 paths
+    :type s3_target_configuration = Optional[list]
+    :param jdbc_target_configuration = Configurations for crawling JDBC paths
+    :type jdbc_target_configuration = Optional[list]
+    :param mongo_target_configuration = Configurations for crawling AWS DocumentDB or MongoDB
+    :type mongo_target_configuration = Optional[list]
+    :param dynamo_target_configuration = Configurations for crawling AWS DynamoDB
+    :type dynamo_target_configuration = Optional[list]
+    :param glue_catalog_target_configuration = Configurations for crawling AWS Glue CatalogDB
+    :type glue_catalog_target_configuration = Optional[list]
+    :param cron_schedule = Cron expression used to define the crawler schedule (e.g. cron(11 18 * ? * *))
+    :type cron_schedule = Optional[str]
+    :param classifiers = List of user defined custom classifiers to be used by the crawler
+    :type classifiers = Optional[list]
+    :param table_prefix = Prefix for catalog table to be created
+    :type table_prefix = Optional[str]
+    :param update_behavior = Behavior when the crawler identifies schema changes
+    :type update_behavior = Optional[str]
+    :param delete_behavior = Behavior when the crawler identifies deleted objects
+    :type delete_behavior = Optional[str]
+    :param recrawl_behavior = Behavior when the crawler needs to crawl again
+    :type recrawl_behavior = Optional[str]
+    :param lineage_settings = Enables or disables data lineage
+    :type lineage_settings = Optional[str]
+    :param json_configuration = Versioned JSON configuration for the crawler
+    :type json_configuration = Optional[str]
+    :param security_configuration = Name of the security configuration structure to be used by the crawler.
+    :type security_configuration = Optional[str]
+    :param tags = Tags to attach to the crawler request
+    :type tags = Optional[dict]
+    :param overwrite = Determines if crawler should be updated if the crawler configuration change
+    :type overwrite = Optional[bool]
+    """
+
+    CRAWLER_POLL_INTERVAL = 6  # polls crawler status after every CRAWLER_POLL_INTERVAL seconds
+
+    def __init__(
+        self,
+        crawler_name=None,
+        crawler_desc=None,
+        glue_db_name=None,
+        iam_role_name=None,
+        s3_targets_configuration=None,
+        jdbc_targets_configuration=None,
+        mongo_targets_configuration=None,
+        dynamo_targets_configuration=None,
+        glue_catalog_targets_configuration=None,
+        cron_schedule=None,
+        classifiers=None,
+        table_prefix=None,
+        update_behavior=None,
+        delete_behavior=None,
+        recrawl_behavior=None,
+        lineage_settings=None,
+        json_configuration=None,
+        security_configuration=None,
+        tags=None,
+        overwrite=False,
+        *args,
+        **kwargs,
+    ):
+
+        self.crawler_name = crawler_name
+        self.crawler_desc = crawler_desc
+        self.glue_db_name = glue_db_name
+        self.iam_role_name = iam_role_name
+        self.s3_targets_configuration = s3_targets_configuration
+        self.jdbc_targets_configuration = jdbc_targets_configuration
+        self.mongo_targets_configuration = mongo_targets_configuration
+        self.dynamo_targets_configuration = dynamo_targets_configuration
+        self.glue_catalog_targets_configuration = glue_catalog_targets_configuration
+        self.cron_schedule = cron_schedule
+        self.classifiers = classifiers
+        self.table_prefix = table_prefix
+        self.update_behavior = update_behavior
+        self.delete_behavior = delete_behavior
+        self.recrawl_behavior = recrawl_behavior
+        self.lineage_settings = lineage_settings
+        self.json_configuration = json_configuration
+        self.security_configuration = security_configuration
+        self.tags = tags
+        self.overwrite = overwrite
+        kwargs['client_type'] = 'glue'
+        super().__init__(*args, **kwargs)
+
+    def list_crawlers(self) -> List:
+        """:return: Lists of Crawlers"""
+        conn = self.get_conn()
+        return conn.get_crawlers()
+
+    def get_iam_execution_role(self) -> Dict:
+        """:return: iam role for crawler execution"""
+        iam_client = self.get_client_type('iam', self.region_name)
+
+        try:
+            glue_execution_role = iam_client.get_role(RoleName=self.iam_role_name)
+            self.log.info("Iam Role Name: %s", self.iam_role_name)
+            return glue_execution_role
+        except Exception as general_error:
+            self.log.error("Failed to create aws glue crawler, error: %s", general_error)
+            raise
+
+    def initialize_crawler(self):
+        """
+        Initializes connection with AWS Glue to run crawler
+        :return:
+        """
+        glue_client = self.get_conn()
+
+        try:
+            crawler_name = self.get_or_create_glue_crawler()
+            crawler_run = glue_client.start_crawler(Name=crawler_name)
+            return crawler_run
+        except Exception as general_error:
+            self.log.error("Failed to run aws glue crawler, error: %s", general_error)
+            raise
+
+    def get_crawler_state(self, crawler_name: str) -> str:
+        """
+        Get state of the Glue crawler. The crawler state can be
+        ready, running, or stopping.
+        :param crawler_name: unique crawler name per AWS account
+        :type crawler_name: str
+        :return: State of the Glue crawler
+        """
+        glue_client = self.get_conn()
+        crawler_run = glue_client.get_crawler(Name=crawler_name)
+        crawler_run_state = crawler_run['Crawler']['State']
+        return crawler_run_state
+
+    def get_crawler_status(self, crawler_name: str) -> str:
+        """
+        Get current status of the Glue crawler. The crawler
+        status can be succeeded, cancelled, or failed.
+        :param crawler_name: unique crawler name per AWS account
+        :type crawler_name: str
+        :return: Status of the Glue crawler
+        """
+        glue_client = self.get_conn()
+        crawler_run = glue_client.get_crawler(Name=crawler_name)
+        crawler_run_status = crawler_run['Crawler']['LastCrawl']['Status']
+        return crawler_run_status
+
+    def crawler_completion(self, crawler_name: str) -> str:
+        """
+        Waits until Glue crawler with crawler_name completes or
+        fails and returns final state if finished.
+        Raises AirflowException when the crawler failed
+        :param crawler_name: unique crawler name per AWS account
+        :type crawler_name: str
+        :return: Dict of crawler's status
+        """
+        failed_status = ['FAILED', 'CANCELLED']
+
+        while True:
+            crawler_run_state = self.get_crawler_state(crawler_name)
+            if crawler_run_state == 'READY':
+                self.log.info("Crawler: %s State: %s", crawler_name, crawler_run_state)
+                crawler_run_status = self.get_crawler_status(crawler_name)
+                if crawler_run_status in failed_status:
+                    crawler_error_message = (
+                        "Exiting Crawler: " + crawler_name + " Run State: " + crawler_run_state
+                    )
+                    self.log.info(crawler_error_message)
+                    raise AirflowException(crawler_error_message)
+                else:
+                    self.log.info("Crawler Status: %s", crawler_run_status)
+                    metrics = self.get_crawler_metrics(self.crawler_name)
+                    print('Last Runtime Duration (seconds): ', metrics['LastRuntimeSeconds'])
+                    print('Median Runtime Duration (seconds): ', metrics['MedianRuntimeSeconds'])
+                    print('Tables Created: ', metrics['TablesCreated'])
+                    print('Tables Updated: ', metrics['TablesUpdated'])
+                    print('Tables Deleted: ', metrics['TablesDeleted'])
+
+                    return {'Status': crawler_run_status}

Review comment:
       why not just return `crawler_run_status` instead of wrapping in dict?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org