You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fe...@apache.org on 2021/01/25 07:24:22 UTC

[airflow] branch master updated: AWS Glue Crawler Integration (#13072)

This is an automated email from the ASF dual-hosted git repository.

feluelle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d55f32  AWS Glue Crawler Integration (#13072)
6d55f32 is described below

commit 6d55f329f93c5cd1e94973194c0cd7caa65309e1
Author: Marshall Mamiya <44...@users.noreply.github.com>
AuthorDate: Sun Jan 24 23:24:07 2021 -0800

    AWS Glue Crawler Integration (#13072)
    
    This change integrates an AWS glue crawler operator, hook and sensor that can be used to trigger glue crawlers from Airflow.
    
    Co-authored-by: Kamil BreguĊ‚a <ka...@polidea.com>
---
 airflow/providers/amazon/aws/hooks/glue_crawler.py | 171 +++++++++++++++++
 .../providers/amazon/aws/operators/glue_crawler.py |  77 ++++++++
 .../providers/amazon/aws/sensors/glue_crawler.py   |  67 +++++++
 airflow/providers/amazon/provider.yaml             |   3 +
 .../amazon/aws/hooks/test_glue_crawler.py          | 211 +++++++++++++++++++++
 .../amazon/aws/operators/test_glue_crawler.py      | 100 ++++++++++
 .../amazon/aws/sensors/test_glue_crawler.py        |  55 ++++++
 7 files changed, 684 insertions(+)

diff --git a/airflow/providers/amazon/aws/hooks/glue_crawler.py b/airflow/providers/amazon/aws/hooks/glue_crawler.py
new file mode 100644
index 0000000..d9bb91a
--- /dev/null
+++ b/airflow/providers/amazon/aws/hooks/glue_crawler.py
@@ -0,0 +1,171 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+    """
+    Interacts with AWS Glue Crawler.
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+    """
+
+    def __init__(self, *args, **kwargs):
+        kwargs['client_type'] = 'glue'
+        super().__init__(*args, **kwargs)
+
+    @cached_property
+    def glue_client(self):
+        """:return: AWS Glue client"""
+        return self.get_conn()
+
+    def has_crawler(self, crawler_name) -> bool:
+        """
+        Checks if the crawler already exists
+
+        :param crawler_name: unique crawler name per AWS account
+        :type crawler_name: str
+        :return: Returns True if the crawler already exists and False if not.
+        """
+        self.log.info("Checking if crawler already exists: %s", crawler_name)
+
+        try:
+            self.get_crawler(crawler_name)
+            return True
+        except self.glue_client.exceptions.EntityNotFoundException:
+            return False
+
+    def get_crawler(self, crawler_name: str) -> dict:
+        """
+        Gets crawler configurations
+
+        :param crawler_name: unique crawler name per AWS account
+        :type crawler_name: str
+        :return: Nested dictionary of crawler configurations
+        """
+        return self.glue_client.get_crawler(Name=crawler_name)['Crawler']
+
+    def update_crawler(self, **crawler_kwargs) -> str:
+        """
+        Updates crawler configurations
+
+        :param crawler_kwargs: Keyword args that define the configurations used for the crawler
+        :type crawler_kwargs: any
+        :return: True if crawler was updated and false otherwise
+        """
+        crawler_name = crawler_kwargs['Name']
+        current_crawler = self.get_crawler(crawler_name)
+
+        update_config = {
+            key: value for key, value in crawler_kwargs.items() if current_crawler[key] != crawler_kwargs[key]
+        }
+        if update_config != {}:
+            self.log.info("Updating crawler: %s", crawler_name)
+            self.glue_client.update_crawler(**crawler_kwargs)
+            self.log.info("Updated configurations: %s", update_config)
+            return True
+        else:
+            return False
+
+    def create_crawler(self, **crawler_kwargs) -> str:
+        """
+        Creates an AWS Glue Crawler
+
+        :param crawler_kwargs: Keyword args that define the configurations used to create the crawler
+        :type crawler_kwargs: any
+        :return: Name of the crawler
+        """
+        crawler_name = crawler_kwargs['Name']
+        self.log.info("Creating crawler: %s", crawler_name)
+        return self.glue_client.create_crawler(**crawler_kwargs)['Crawler']['Name']
+
+    def start_crawler(self, crawler_name: str) -> dict:
+        """
+        Triggers the AWS Glue crawler
+
+        :param crawler_name: unique crawler name per AWS account
+        :type crawler_name: str
+        :return: Empty dictionary
+        """
+        self.log.info("Starting crawler %s", crawler_name)
+        crawler = self.glue_client.start_crawler(Name=crawler_name)
+        return crawler
+
+    def wait_for_crawler_completion(self, crawler_name: str, poll_interval: int = 5) -> str:
+        """
+        Waits until Glue crawler completes and
+        returns the status of the latest crawl run.
+        Raises AirflowException if the crawler fails or is cancelled.
+
+        :param crawler_name: unique crawler name per AWS account
+        :type crawler_name: str
+        :param poll_interval: Time (in seconds) to wait between two consecutive calls to check crawler status
+        :type poll_interval: int
+        :return: Crawler's status
+        """
+        failed_status = ['FAILED', 'CANCELLED']
+
+        while True:
+            crawler = self.get_crawler(crawler_name)
+            crawler_state = crawler['State']
+            if crawler_state == 'READY':
+                self.log.info("State: %s", crawler_state)
+                self.log.info("crawler_config: %s", crawler)
+                crawler_status = crawler['LastCrawl']['Status']
+                if crawler_status in failed_status:
+                    raise AirflowException(
+                        f"Status: {crawler_status}"
+                    )  # pylint: disable=raising-format-tuple
+                else:
+                    metrics = self.glue_client.get_crawler_metrics(CrawlerNameList=[crawler_name])[
+                        'CrawlerMetricsList'
+                    ][0]
+                    self.log.info("Status: %s", crawler_status)
+                    self.log.info("Last Runtime Duration (seconds): %s", metrics['LastRuntimeSeconds'])
+                    self.log.info("Median Runtime Duration (seconds): %s", metrics['MedianRuntimeSeconds'])
+                    self.log.info("Tables Created: %s", metrics['TablesCreated'])
+                    self.log.info("Tables Updated: %s", metrics['TablesUpdated'])
+                    self.log.info("Tables Deleted: %s", metrics['TablesDeleted'])
+
+                    return crawler_status
+
+            else:
+                self.log.info("Polling for AWS Glue crawler: %s ", crawler_name)
+                self.log.info("State: %s", crawler_state)
+
+                metrics = self.glue_client.get_crawler_metrics(CrawlerNameList=[crawler_name])[
+                    'CrawlerMetricsList'
+                ][0]
+                time_left = int(metrics['TimeLeftSeconds'])
+
+                if time_left > 0:
+                    self.log.info("Estimated Time Left (seconds): %s", time_left)
+                else:
+                    self.log.info("Crawler should finish soon")
+
+                sleep(poll_interval)
diff --git a/airflow/providers/amazon/aws/operators/glue_crawler.py b/airflow/providers/amazon/aws/operators/glue_crawler.py
new file mode 100644
index 0000000..82807bc
--- /dev/null
+++ b/airflow/providers/amazon/aws/operators/glue_crawler.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AwsGlueCrawlerOperator(BaseOperator):
+    """
+    Creates, updates and triggers an AWS Glue Crawler. AWS Glue Crawler is a serverless
+    service that manages a catalog of metadata tables that contain the inferred
+    schema, format and data types of data stores within the AWS cloud.
+
+    :param config: Configurations for the AWS Glue crawler
+    :type config: dict
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: Optional[str]
+    :param poll_interval: Time (in seconds) to wait between two consecutive calls to check crawler status
+    :type poll_interval: Optional[int]
+    """
+
+    ui_color = '#ededed'
+
+    @apply_defaults
+    def __init__(
+        self,
+        config,
+        aws_conn_id='aws_default',
+        poll_interval: int = 5,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.aws_conn_id = aws_conn_id
+        self.poll_interval = poll_interval
+        self.config = config
+
+    @cached_property
+    def hook(self) -> AwsGlueCrawlerHook:
+        """Create and return an AwsGlueCrawlerHook."""
+        return AwsGlueCrawlerHook(self.aws_conn_id)
+
+    def execute(self, context):
+        """
+        Executes AWS Glue Crawler from Airflow
+
+        :return: the name of the current glue crawler.
+        """
+        crawler_name = self.config['Name']
+        if self.hook.has_crawler(crawler_name):
+            self.hook.update_crawler(**self.config)
+        else:
+            self.hook.create_crawler(**self.config)
+
+        self.log.info("Triggering AWS Glue Crawler")
+        self.hook.start_crawler(crawler_name)
+        self.log.info("Waiting for AWS Glue Crawler")
+        self.hook.wait_for_crawler_completion(crawler_name=crawler_name, poll_interval=self.poll_interval)
+
+        return crawler_name
diff --git a/airflow/providers/amazon/aws/sensors/glue_crawler.py b/airflow/providers/amazon/aws/sensors/glue_crawler.py
new file mode 100644
index 0000000..4ecd4a0
--- /dev/null
+++ b/airflow/providers/amazon/aws/sensors/glue_crawler.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AwsGlueCrawlerSensor(BaseSensorOperator):
+    """
+    Waits for an AWS Glue crawler to reach any of the statuses below
+    'FAILED', 'CANCELLED', 'SUCCEEDED'
+
+    :param crawler_name: The AWS Glue crawler unique name
+    :type crawler_name: str
+    :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+    :type aws_conn_id: str
+    """
+
+    @apply_defaults
+    def __init__(self, *, crawler_name: str, aws_conn_id: str = 'aws_default', **kwargs) -> None:
+        super().__init__(**kwargs)
+        self.crawler_name = crawler_name
+        self.aws_conn_id = aws_conn_id
+        self.success_statuses = 'SUCCEEDED'
+        self.errored_statuses = ('FAILED', 'CANCELLED')
+        self.hook: Optional[AwsGlueCrawlerHook] = None
+
+    def poke(self, context):
+        hook = self.get_hook()
+        self.log.info("Poking for AWS Glue crawler: %s", self.crawler_name)
+        crawler_state = hook.get_crawler(self.crawler_name)['State']
+        if crawler_state == 'READY':
+            self.log.info("State: %s", crawler_state)
+            crawler_status = hook.get_crawler(self.crawler_name)['LastCrawl']['Status']
+            if crawler_status == self.success_statuses:
+                self.log.info("Status: %s", crawler_status)
+                return True
+            else:
+                raise AirflowException(f"Status: {crawler_status}")
+        else:
+            return False
+
+    def get_hook(self) -> AwsGlueCrawlerHook:
+        """Returns a new or pre-existing AwsGlueCrawlerHook"""
+        if self.hook:
+            return self.hook
+
+        self.hook = AwsGlueCrawlerHook(aws_conn_id=self.aws_conn_id)
+        return self.hook
diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml
index 2890fb0..692daa6 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -161,6 +161,7 @@ operators:
   - integration-name: AWS Glue
     python-modules:
       - airflow.providers.amazon.aws.operators.glue
+      - airflow.providers.amazon.aws.operators.glue_crawler
   - integration-name: Amazon Simple Storage Service (S3)
     python-modules:
       - airflow.providers.amazon.aws.operators.s3_bucket
@@ -210,6 +211,7 @@ sensors:
   - integration-name: AWS Glue
     python-modules:
       - airflow.providers.amazon.aws.sensors.glue
+      - airflow.providers.amazon.aws.sensors.glue_crawler
       - airflow.providers.amazon.aws.sensors.glue_catalog_partition
   - integration-name: Amazon Redshift
     python-modules:
@@ -269,6 +271,7 @@ hooks:
   - integration-name: AWS Glue
     python-modules:
       - airflow.providers.amazon.aws.hooks.glue
+      - airflow.providers.amazon.aws.hooks.glue_crawler
       - airflow.providers.amazon.aws.hooks.glue_catalog
   - integration-name: Amazon Kinesis Data Firehose
     python-modules:
diff --git a/tests/providers/amazon/aws/hooks/test_glue_crawler.py b/tests/providers/amazon/aws/hooks/test_glue_crawler.py
new file mode 100644
index 0000000..7230235
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_glue_crawler.py
@@ -0,0 +1,211 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from copy import deepcopy
+from unittest import mock
+
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+
+mock_crawler_name = 'test-crawler'
+mock_role_name = 'test-role'
+mock_config = {
+    'Name': mock_crawler_name,
+    'Description': 'Test glue crawler from Airflow',
+    'DatabaseName': 'test_db',
+    'Role': mock_role_name,
+    'Targets': {
+        'S3Targets': [
+            {
+                'Path': 's3://test-glue-crawler/foo/',
+                'Exclusions': [
+                    's3://test-glue-crawler/bar/',
+                ],
+                'ConnectionName': 'test-s3-conn',
+            }
+        ],
+        'JdbcTargets': [
+            {
+                'ConnectionName': 'test-jdbc-conn',
+                'Path': 'test_db/test_table>',
+                'Exclusions': [
+                    'string',
+                ],
+            }
+        ],
+        'MongoDBTargets': [
+            {'ConnectionName': 'test-mongo-conn', 'Path': 'test_db/test_collection', 'ScanAll': True}
+        ],
+        'DynamoDBTargets': [{'Path': 'test_db/test_table', 'scanAll': True, 'scanRate': 123.0}],
+        'CatalogTargets': [
+            {
+                'DatabaseName': 'test_glue_db',
+                'Tables': [
+                    'test',
+                ],
+            }
+        ],
+    },
+    'Classifiers': ['test-classifier'],
+    'TablePrefix': 'test',
+    'SchemaChangePolicy': {
+        'UpdateBehavior': 'UPDATE_IN_DATABASE',
+        'DeleteBehavior': 'DEPRECATE_IN_DATABASE',
+    },
+    'RecrawlPolicy': {'RecrawlBehavior': 'CRAWL_EVERYTHING'},
+    'LineageConfiguration': 'ENABLE',
+    'Configuration': """
+    {
+        "Version": 1.0,
+        "CrawlerOutput": {
+            "Partitions": { "AddOrUpdateBehavior": "InheritFromTable" }
+        }
+    }
+    """,
+    'SecurityConfiguration': 'test',
+    'Tags': {'test': 'foo'},
+}
+
+
+class TestAwsGlueCrawlerHook(unittest.TestCase):
+    @classmethod
+    def setUp(cls):
+        cls.hook = AwsGlueCrawlerHook(aws_conn_id="aws_default")
+
+    def test_init(self):
+        self.assertEqual(self.hook.aws_conn_id, "aws_default")
+
+    @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+    def test_has_crawler(self, mock_get_conn):
+        response = self.hook.has_crawler(mock_crawler_name)
+        self.assertEqual(response, True)
+        mock_get_conn.return_value.get_crawler.assert_called_once_with(Name=mock_crawler_name)
+
+    @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+    def test_has_crawler_crawled_doesnt_exists(self, mock_get_conn):
+        class MockException(Exception):
+            pass
+
+        mock_get_conn.return_value.exceptions.EntityNotFoundException = MockException
+        mock_get_conn.return_value.get_crawler.side_effect = MockException("AAA")
+        response = self.hook.has_crawler(mock_crawler_name)
+        self.assertEqual(response, False)
+        mock_get_conn.return_value.get_crawler.assert_called_once_with(Name=mock_crawler_name)
+
+    @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+    def test_update_crawler_needed(self, mock_get_conn):
+        mock_get_conn.return_value.get_crawler.return_value = {'Crawler': mock_config}
+
+        mock_config_two = deepcopy(mock_config)
+        mock_config_two['Role'] = 'test-2-role'
+        response = self.hook.update_crawler(**mock_config_two)
+        self.assertEqual(response, True)
+        mock_get_conn.return_value.get_crawler.assert_called_once_with(Name=mock_crawler_name)
+        mock_get_conn.return_value.update_crawler.assert_called_once_with(**mock_config_two)
+
+    @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+    def test_update_crawler_not_needed(self, mock_get_conn):
+        mock_get_conn.return_value.get_crawler.return_value = {'Crawler': mock_config}
+        response = self.hook.update_crawler(**mock_config)
+        self.assertEqual(response, False)
+        mock_get_conn.return_value.get_crawler.assert_called_once_with(Name=mock_crawler_name)
+
+    @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+    def test_create_crawler(self, mock_get_conn):
+        mock_get_conn.return_value.create_crawler.return_value = {'Crawler': {'Name': mock_crawler_name}}
+        glue_crawler = self.hook.create_crawler(**mock_config)
+
+        self.assertEqual(glue_crawler, mock_crawler_name)
+
+    @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+    def test_start_crawler(self, mock_get_conn):
+        result = self.hook.start_crawler(mock_crawler_name)
+        self.assertEqual(result, mock_get_conn.return_value.start_crawler.return_value)
+
+        mock_get_conn.return_value.start_crawler.assert_called_once_with(Name=mock_crawler_name)
+
+    @mock.patch.object(AwsGlueCrawlerHook, "get_crawler")
+    @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+    def test_wait_for_crawler_completion_instant_ready(self, mock_get_conn, mock_get_crawler):
+        mock_get_crawler.side_effect = [
+            {'State': 'READY', 'LastCrawl': {'Status': 'MOCK_STATUS'}},
+        ]
+        mock_get_conn.return_value.get_crawler_metrics.return_value = {
+            'CrawlerMetricsList': [
+                {
+                    'LastRuntimeSeconds': 'TEST-A',
+                    'MedianRuntimeSeconds': 'TEST-B',
+                    'TablesCreated': 'TEST-C',
+                    'TablesUpdated': 'TEST-D',
+                    'TablesDeleted': 'TEST-E',
+                }
+            ]
+        }
+        result = self.hook.wait_for_crawler_completion(mock_crawler_name)
+        self.assertEqual(result, 'MOCK_STATUS')
+        mock_get_conn.assert_has_calls(
+            [
+                mock.call(),
+                mock.call().get_crawler_metrics(CrawlerNameList=[mock_crawler_name]),
+            ]
+        )
+        mock_get_crawler.assert_has_calls(
+            [
+                mock.call(mock_crawler_name),
+            ]
+        )
+
+    @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+    @mock.patch.object(AwsGlueCrawlerHook, "get_crawler")
+    @mock.patch('airflow.providers.amazon.aws.hooks.glue_crawler.sleep')
+    def test_wait_for_crawler_completion_retry_two_times(self, mock_sleep, mock_get_crawler, mock_get_conn):
+        mock_get_crawler.side_effect = [
+            {'State': 'RUNNING'},
+            {'State': 'READY', 'LastCrawl': {'Status': 'MOCK_STATUS'}},
+        ]
+        mock_get_conn.return_value.get_crawler_metrics.side_effect = [
+            {'CrawlerMetricsList': [{'TimeLeftSeconds': 12}]},
+            {
+                'CrawlerMetricsList': [
+                    {
+                        'LastRuntimeSeconds': 'TEST-A',
+                        'MedianRuntimeSeconds': 'TEST-B',
+                        'TablesCreated': 'TEST-C',
+                        'TablesUpdated': 'TEST-D',
+                        'TablesDeleted': 'TEST-E',
+                    }
+                ]
+            },
+        ]
+        result = self.hook.wait_for_crawler_completion(mock_crawler_name)
+        self.assertEqual(result, 'MOCK_STATUS')
+        mock_get_conn.assert_has_calls(
+            [
+                mock.call(),
+                mock.call().get_crawler_metrics(CrawlerNameList=[mock_crawler_name]),
+            ]
+        )
+        mock_get_crawler.assert_has_calls(
+            [
+                mock.call(mock_crawler_name),
+                mock.call(mock_crawler_name),
+            ]
+        )
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/tests/providers/amazon/aws/operators/test_glue_crawler.py b/tests/providers/amazon/aws/operators/test_glue_crawler.py
new file mode 100644
index 0000000..be10ef4
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_glue_crawler.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+from airflow.providers.amazon.aws.operators.glue_crawler import AwsGlueCrawlerOperator
+
+mock_crawler_name = 'test-crawler'
+mock_role_name = 'test-role'
+mock_config = {
+    'Name': mock_crawler_name,
+    'Description': 'Test glue crawler from Airflow',
+    'DatabaseName': 'test_db',
+    'Role': mock_role_name,
+    'Targets': {
+        'S3Targets': [
+            {
+                'Path': 's3://test-glue-crawler/foo/',
+                'Exclusions': [
+                    's3://test-glue-crawler/bar/',
+                ],
+                'ConnectionName': 'test-s3-conn',
+            }
+        ],
+        'JdbcTargets': [
+            {
+                'ConnectionName': 'test-jdbc-conn',
+                'Path': 'test_db/test_table>',
+                'Exclusions': [
+                    'string',
+                ],
+            }
+        ],
+        'MongoDBTargets': [
+            {'ConnectionName': 'test-mongo-conn', 'Path': 'test_db/test_collection', 'ScanAll': True}
+        ],
+        'DynamoDBTargets': [{'Path': 'test_db/test_table', 'scanAll': True, 'scanRate': 123.0}],
+        'CatalogTargets': [
+            {
+                'DatabaseName': 'test_glue_db',
+                'Tables': [
+                    'test',
+                ],
+            }
+        ],
+    },
+    'Classifiers': ['test-classifier'],
+    'TablePrefix': 'test',
+    'SchemaChangePolicy': {
+        'UpdateBehavior': 'UPDATE_IN_DATABASE',
+        'DeleteBehavior': 'DEPRECATE_IN_DATABASE',
+    },
+    'RecrawlPolicy': {'RecrawlBehavior': 'CRAWL_EVERYTHING'},
+    'LineageConfiguration': 'ENABLE',
+    'Configuration': """
+    {
+        "Version": 1.0,
+        "CrawlerOutput": {
+            "Partitions": { "AddOrUpdateBehavior": "InheritFromTable" }
+        }
+    }
+    """,
+    'SecurityConfiguration': 'test',
+    'Tags': {'test': 'foo'},
+}
+
+
+class TestAwsGlueCrawlerOperator(unittest.TestCase):
+    def setUp(self):
+        self.glue = AwsGlueCrawlerOperator(task_id='test_glue_crawler_operator', config=mock_config)
+
+    @mock.patch('airflow.providers.amazon.aws.operators.glue_crawler.AwsGlueCrawlerHook')
+    def test_execute_without_failure(self, mock_hook):
+        mock_hook.return_value.has_crawler.return_value = True
+        self.glue.execute(None)
+
+        mock_hook.assert_has_calls(
+            [
+                mock.call('aws_default'),
+                mock.call().has_crawler('test-crawler'),
+                mock.call().update_crawler(**mock_config),
+                mock.call().start_crawler(mock_crawler_name),
+                mock.call().wait_for_crawler_completion(crawler_name=mock_crawler_name, poll_interval=5),
+            ]
+        )
diff --git a/tests/providers/amazon/aws/sensors/test_glue_crawler.py b/tests/providers/amazon/aws/sensors/test_glue_crawler.py
new file mode 100644
index 0000000..749edfe
--- /dev/null
+++ b/tests/providers/amazon/aws/sensors/test_glue_crawler.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+from airflow.providers.amazon.aws.sensors.glue_crawler import AwsGlueCrawlerSensor
+
+
+class TestAwsGlueCrawlerSensor(unittest.TestCase):
+    def setUp(self):
+        self.sensor = AwsGlueCrawlerSensor(
+            task_id='test_glue_crawler_sensor',
+            crawler_name='aws_test_glue_crawler',
+            poke_interval=1,
+            timeout=5,
+            aws_conn_id='aws_default',
+        )
+
+    @mock.patch.object(AwsGlueCrawlerHook, 'get_crawler')
+    def test_poke_success(self, mock_get_crawler):
+        mock_get_crawler.return_value['LastCrawl']['Status'] = "SUCCEEDED"
+        self.assertFalse(self.sensor.poke(None))
+        mock_get_crawler.assert_called_once_with('aws_test_glue_crawler')
+
+    @mock.patch.object(AwsGlueCrawlerHook, 'get_crawler')
+    def test_poke_failed(self, mock_get_crawler):
+        mock_get_crawler.return_value['LastCrawl']['Status'] = "FAILED"
+        self.assertFalse(self.sensor.poke(None))
+        mock_get_crawler.assert_called_once_with('aws_test_glue_crawler')
+
+    @mock.patch.object(AwsGlueCrawlerHook, 'get_crawler')
+    def test_poke_cancelled(self, mock_get_crawler):
+        mock_get_crawler.return_value['LastCrawl']['Status'] = "CANCELLED"
+        self.assertFalse(self.sensor.poke(None))
+        mock_get_crawler.assert_called_once_with('aws_test_glue_crawler')
+
+
+if __name__ == '__main__':
+    unittest.main()