You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fe...@apache.org on 2021/01/25 07:24:22 UTC
[airflow] branch master updated: AWS Glue Crawler Integration
(#13072)
This is an automated email from the ASF dual-hosted git repository.
feluelle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 6d55f32 AWS Glue Crawler Integration (#13072)
6d55f32 is described below
commit 6d55f329f93c5cd1e94973194c0cd7caa65309e1
Author: Marshall Mamiya <44...@users.noreply.github.com>
AuthorDate: Sun Jan 24 23:24:07 2021 -0800
AWS Glue Crawler Integration (#13072)
This change integrates an AWS glue crawler operator, hook and sensor that can be used to trigger glue crawlers from Airflow.
Co-authored-by: Kamil BreguĊa <ka...@polidea.com>
---
airflow/providers/amazon/aws/hooks/glue_crawler.py | 171 +++++++++++++++++
.../providers/amazon/aws/operators/glue_crawler.py | 77 ++++++++
.../providers/amazon/aws/sensors/glue_crawler.py | 67 +++++++
airflow/providers/amazon/provider.yaml | 3 +
.../amazon/aws/hooks/test_glue_crawler.py | 211 +++++++++++++++++++++
.../amazon/aws/operators/test_glue_crawler.py | 100 ++++++++++
.../amazon/aws/sensors/test_glue_crawler.py | 55 ++++++
7 files changed, 684 insertions(+)
diff --git a/airflow/providers/amazon/aws/hooks/glue_crawler.py b/airflow/providers/amazon/aws/hooks/glue_crawler.py
new file mode 100644
index 0000000..d9bb91a
--- /dev/null
+++ b/airflow/providers/amazon/aws/hooks/glue_crawler.py
@@ -0,0 +1,171 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+ """
+ Interacts with AWS Glue Crawler.
+
+ Additional arguments (such as ``aws_conn_id``) may be specified and
+ are passed down to the underlying AwsBaseHook.
+
+ .. seealso::
+ :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+ """
+
+ def __init__(self, *args, **kwargs):
+ kwargs['client_type'] = 'glue'
+ super().__init__(*args, **kwargs)
+
+ @cached_property
+ def glue_client(self):
+ """:return: AWS Glue client"""
+ return self.get_conn()
+
+ def has_crawler(self, crawler_name) -> bool:
+ """
+ Checks if the crawler already exists
+
+ :param crawler_name: unique crawler name per AWS account
+ :type crawler_name: str
+ :return: Returns True if the crawler already exists and False if not.
+ """
+ self.log.info("Checking if crawler already exists: %s", crawler_name)
+
+ try:
+ self.get_crawler(crawler_name)
+ return True
+ except self.glue_client.exceptions.EntityNotFoundException:
+ return False
+
+ def get_crawler(self, crawler_name: str) -> dict:
+ """
+ Gets crawler configurations
+
+ :param crawler_name: unique crawler name per AWS account
+ :type crawler_name: str
+ :return: Nested dictionary of crawler configurations
+ """
+ return self.glue_client.get_crawler(Name=crawler_name)['Crawler']
+
+ def update_crawler(self, **crawler_kwargs) -> str:
+ """
+ Updates crawler configurations
+
+ :param crawler_kwargs: Keyword args that define the configurations used for the crawler
+ :type crawler_kwargs: any
+ :return: True if crawler was updated and false otherwise
+ """
+ crawler_name = crawler_kwargs['Name']
+ current_crawler = self.get_crawler(crawler_name)
+
+ update_config = {
+ key: value for key, value in crawler_kwargs.items() if current_crawler[key] != crawler_kwargs[key]
+ }
+ if update_config != {}:
+ self.log.info("Updating crawler: %s", crawler_name)
+ self.glue_client.update_crawler(**crawler_kwargs)
+ self.log.info("Updated configurations: %s", update_config)
+ return True
+ else:
+ return False
+
+ def create_crawler(self, **crawler_kwargs) -> str:
+ """
+ Creates an AWS Glue Crawler
+
+ :param crawler_kwargs: Keyword args that define the configurations used to create the crawler
+ :type crawler_kwargs: any
+ :return: Name of the crawler
+ """
+ crawler_name = crawler_kwargs['Name']
+ self.log.info("Creating crawler: %s", crawler_name)
+ return self.glue_client.create_crawler(**crawler_kwargs)['Crawler']['Name']
+
+ def start_crawler(self, crawler_name: str) -> dict:
+ """
+ Triggers the AWS Glue crawler
+
+ :param crawler_name: unique crawler name per AWS account
+ :type crawler_name: str
+ :return: Empty dictionary
+ """
+ self.log.info("Starting crawler %s", crawler_name)
+ crawler = self.glue_client.start_crawler(Name=crawler_name)
+ return crawler
+
+ def wait_for_crawler_completion(self, crawler_name: str, poll_interval: int = 5) -> str:
+ """
+ Waits until Glue crawler completes and
+ returns the status of the latest crawl run.
+ Raises AirflowException if the crawler fails or is cancelled.
+
+ :param crawler_name: unique crawler name per AWS account
+ :type crawler_name: str
+ :param poll_interval: Time (in seconds) to wait between two consecutive calls to check crawler status
+ :type poll_interval: int
+ :return: Crawler's status
+ """
+ failed_status = ['FAILED', 'CANCELLED']
+
+ while True:
+ crawler = self.get_crawler(crawler_name)
+ crawler_state = crawler['State']
+ if crawler_state == 'READY':
+ self.log.info("State: %s", crawler_state)
+ self.log.info("crawler_config: %s", crawler)
+ crawler_status = crawler['LastCrawl']['Status']
+ if crawler_status in failed_status:
+ raise AirflowException(
+ f"Status: {crawler_status}"
+ ) # pylint: disable=raising-format-tuple
+ else:
+ metrics = self.glue_client.get_crawler_metrics(CrawlerNameList=[crawler_name])[
+ 'CrawlerMetricsList'
+ ][0]
+ self.log.info("Status: %s", crawler_status)
+ self.log.info("Last Runtime Duration (seconds): %s", metrics['LastRuntimeSeconds'])
+ self.log.info("Median Runtime Duration (seconds): %s", metrics['MedianRuntimeSeconds'])
+ self.log.info("Tables Created: %s", metrics['TablesCreated'])
+ self.log.info("Tables Updated: %s", metrics['TablesUpdated'])
+ self.log.info("Tables Deleted: %s", metrics['TablesDeleted'])
+
+ return crawler_status
+
+ else:
+ self.log.info("Polling for AWS Glue crawler: %s ", crawler_name)
+ self.log.info("State: %s", crawler_state)
+
+ metrics = self.glue_client.get_crawler_metrics(CrawlerNameList=[crawler_name])[
+ 'CrawlerMetricsList'
+ ][0]
+ time_left = int(metrics['TimeLeftSeconds'])
+
+ if time_left > 0:
+ self.log.info("Estimated Time Left (seconds): %s", time_left)
+ else:
+ self.log.info("Crawler should finish soon")
+
+ sleep(poll_interval)
diff --git a/airflow/providers/amazon/aws/operators/glue_crawler.py b/airflow/providers/amazon/aws/operators/glue_crawler.py
new file mode 100644
index 0000000..82807bc
--- /dev/null
+++ b/airflow/providers/amazon/aws/operators/glue_crawler.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from cached_property import cached_property
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AwsGlueCrawlerOperator(BaseOperator):
+ """
+ Creates, updates and triggers an AWS Glue Crawler. AWS Glue Crawler is a serverless
+ service that manages a catalog of metadata tables that contain the inferred
+ schema, format and data types of data stores within the AWS cloud.
+
+ :param config: Configurations for the AWS Glue crawler
+ :type config: dict
+ :param aws_conn_id: aws connection to use
+ :type aws_conn_id: Optional[str]
+ :param poll_interval: Time (in seconds) to wait between two consecutive calls to check crawler status
+ :type poll_interval: Optional[int]
+ """
+
+ ui_color = '#ededed'
+
+ @apply_defaults
+ def __init__(
+ self,
+ config,
+ aws_conn_id='aws_default',
+ poll_interval: int = 5,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.aws_conn_id = aws_conn_id
+ self.poll_interval = poll_interval
+ self.config = config
+
+ @cached_property
+ def hook(self) -> AwsGlueCrawlerHook:
+ """Create and return an AwsGlueCrawlerHook."""
+ return AwsGlueCrawlerHook(self.aws_conn_id)
+
+ def execute(self, context):
+ """
+ Executes AWS Glue Crawler from Airflow
+
+ :return: the name of the current glue crawler.
+ """
+ crawler_name = self.config['Name']
+ if self.hook.has_crawler(crawler_name):
+ self.hook.update_crawler(**self.config)
+ else:
+ self.hook.create_crawler(**self.config)
+
+ self.log.info("Triggering AWS Glue Crawler")
+ self.hook.start_crawler(crawler_name)
+ self.log.info("Waiting for AWS Glue Crawler")
+ self.hook.wait_for_crawler_completion(crawler_name=crawler_name, poll_interval=self.poll_interval)
+
+ return crawler_name
diff --git a/airflow/providers/amazon/aws/sensors/glue_crawler.py b/airflow/providers/amazon/aws/sensors/glue_crawler.py
new file mode 100644
index 0000000..4ecd4a0
--- /dev/null
+++ b/airflow/providers/amazon/aws/sensors/glue_crawler.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AwsGlueCrawlerSensor(BaseSensorOperator):
+ """
+ Waits for an AWS Glue crawler to reach any of the statuses below
+ 'FAILED', 'CANCELLED', 'SUCCEEDED'
+
+ :param crawler_name: The AWS Glue crawler unique name
+ :type crawler_name: str
+ :param aws_conn_id: aws connection to use, defaults to 'aws_default'
+ :type aws_conn_id: str
+ """
+
+ @apply_defaults
+ def __init__(self, *, crawler_name: str, aws_conn_id: str = 'aws_default', **kwargs) -> None:
+ super().__init__(**kwargs)
+ self.crawler_name = crawler_name
+ self.aws_conn_id = aws_conn_id
+ self.success_statuses = 'SUCCEEDED'
+ self.errored_statuses = ('FAILED', 'CANCELLED')
+ self.hook: Optional[AwsGlueCrawlerHook] = None
+
+ def poke(self, context):
+ hook = self.get_hook()
+ self.log.info("Poking for AWS Glue crawler: %s", self.crawler_name)
+ crawler_state = hook.get_crawler(self.crawler_name)['State']
+ if crawler_state == 'READY':
+ self.log.info("State: %s", crawler_state)
+ crawler_status = hook.get_crawler(self.crawler_name)['LastCrawl']['Status']
+ if crawler_status == self.success_statuses:
+ self.log.info("Status: %s", crawler_status)
+ return True
+ else:
+ raise AirflowException(f"Status: {crawler_status}")
+ else:
+ return False
+
+ def get_hook(self) -> AwsGlueCrawlerHook:
+ """Returns a new or pre-existing AwsGlueCrawlerHook"""
+ if self.hook:
+ return self.hook
+
+ self.hook = AwsGlueCrawlerHook(aws_conn_id=self.aws_conn_id)
+ return self.hook
diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml
index 2890fb0..692daa6 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -161,6 +161,7 @@ operators:
- integration-name: AWS Glue
python-modules:
- airflow.providers.amazon.aws.operators.glue
+ - airflow.providers.amazon.aws.operators.glue_crawler
- integration-name: Amazon Simple Storage Service (S3)
python-modules:
- airflow.providers.amazon.aws.operators.s3_bucket
@@ -210,6 +211,7 @@ sensors:
- integration-name: AWS Glue
python-modules:
- airflow.providers.amazon.aws.sensors.glue
+ - airflow.providers.amazon.aws.sensors.glue_crawler
- airflow.providers.amazon.aws.sensors.glue_catalog_partition
- integration-name: Amazon Redshift
python-modules:
@@ -269,6 +271,7 @@ hooks:
- integration-name: AWS Glue
python-modules:
- airflow.providers.amazon.aws.hooks.glue
+ - airflow.providers.amazon.aws.hooks.glue_crawler
- airflow.providers.amazon.aws.hooks.glue_catalog
- integration-name: Amazon Kinesis Data Firehose
python-modules:
diff --git a/tests/providers/amazon/aws/hooks/test_glue_crawler.py b/tests/providers/amazon/aws/hooks/test_glue_crawler.py
new file mode 100644
index 0000000..7230235
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_glue_crawler.py
@@ -0,0 +1,211 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from copy import deepcopy
+from unittest import mock
+
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+
+mock_crawler_name = 'test-crawler'
+mock_role_name = 'test-role'
+mock_config = {
+ 'Name': mock_crawler_name,
+ 'Description': 'Test glue crawler from Airflow',
+ 'DatabaseName': 'test_db',
+ 'Role': mock_role_name,
+ 'Targets': {
+ 'S3Targets': [
+ {
+ 'Path': 's3://test-glue-crawler/foo/',
+ 'Exclusions': [
+ 's3://test-glue-crawler/bar/',
+ ],
+ 'ConnectionName': 'test-s3-conn',
+ }
+ ],
+ 'JdbcTargets': [
+ {
+ 'ConnectionName': 'test-jdbc-conn',
+ 'Path': 'test_db/test_table>',
+ 'Exclusions': [
+ 'string',
+ ],
+ }
+ ],
+ 'MongoDBTargets': [
+ {'ConnectionName': 'test-mongo-conn', 'Path': 'test_db/test_collection', 'ScanAll': True}
+ ],
+ 'DynamoDBTargets': [{'Path': 'test_db/test_table', 'scanAll': True, 'scanRate': 123.0}],
+ 'CatalogTargets': [
+ {
+ 'DatabaseName': 'test_glue_db',
+ 'Tables': [
+ 'test',
+ ],
+ }
+ ],
+ },
+ 'Classifiers': ['test-classifier'],
+ 'TablePrefix': 'test',
+ 'SchemaChangePolicy': {
+ 'UpdateBehavior': 'UPDATE_IN_DATABASE',
+ 'DeleteBehavior': 'DEPRECATE_IN_DATABASE',
+ },
+ 'RecrawlPolicy': {'RecrawlBehavior': 'CRAWL_EVERYTHING'},
+ 'LineageConfiguration': 'ENABLE',
+ 'Configuration': """
+ {
+ "Version": 1.0,
+ "CrawlerOutput": {
+ "Partitions": { "AddOrUpdateBehavior": "InheritFromTable" }
+ }
+ }
+ """,
+ 'SecurityConfiguration': 'test',
+ 'Tags': {'test': 'foo'},
+}
+
+
+class TestAwsGlueCrawlerHook(unittest.TestCase):
+ @classmethod
+ def setUp(cls):
+ cls.hook = AwsGlueCrawlerHook(aws_conn_id="aws_default")
+
+ def test_init(self):
+ self.assertEqual(self.hook.aws_conn_id, "aws_default")
+
+ @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+ def test_has_crawler(self, mock_get_conn):
+ response = self.hook.has_crawler(mock_crawler_name)
+ self.assertEqual(response, True)
+ mock_get_conn.return_value.get_crawler.assert_called_once_with(Name=mock_crawler_name)
+
+ @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+ def test_has_crawler_crawled_doesnt_exists(self, mock_get_conn):
+ class MockException(Exception):
+ pass
+
+ mock_get_conn.return_value.exceptions.EntityNotFoundException = MockException
+ mock_get_conn.return_value.get_crawler.side_effect = MockException("AAA")
+ response = self.hook.has_crawler(mock_crawler_name)
+ self.assertEqual(response, False)
+ mock_get_conn.return_value.get_crawler.assert_called_once_with(Name=mock_crawler_name)
+
+ @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+ def test_update_crawler_needed(self, mock_get_conn):
+ mock_get_conn.return_value.get_crawler.return_value = {'Crawler': mock_config}
+
+ mock_config_two = deepcopy(mock_config)
+ mock_config_two['Role'] = 'test-2-role'
+ response = self.hook.update_crawler(**mock_config_two)
+ self.assertEqual(response, True)
+ mock_get_conn.return_value.get_crawler.assert_called_once_with(Name=mock_crawler_name)
+ mock_get_conn.return_value.update_crawler.assert_called_once_with(**mock_config_two)
+
+ @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+ def test_update_crawler_not_needed(self, mock_get_conn):
+ mock_get_conn.return_value.get_crawler.return_value = {'Crawler': mock_config}
+ response = self.hook.update_crawler(**mock_config)
+ self.assertEqual(response, False)
+ mock_get_conn.return_value.get_crawler.assert_called_once_with(Name=mock_crawler_name)
+
+ @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+ def test_create_crawler(self, mock_get_conn):
+ mock_get_conn.return_value.create_crawler.return_value = {'Crawler': {'Name': mock_crawler_name}}
+ glue_crawler = self.hook.create_crawler(**mock_config)
+
+ self.assertEqual(glue_crawler, mock_crawler_name)
+
+ @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+ def test_start_crawler(self, mock_get_conn):
+ result = self.hook.start_crawler(mock_crawler_name)
+ self.assertEqual(result, mock_get_conn.return_value.start_crawler.return_value)
+
+ mock_get_conn.return_value.start_crawler.assert_called_once_with(Name=mock_crawler_name)
+
+ @mock.patch.object(AwsGlueCrawlerHook, "get_crawler")
+ @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+ def test_wait_for_crawler_completion_instant_ready(self, mock_get_conn, mock_get_crawler):
+ mock_get_crawler.side_effect = [
+ {'State': 'READY', 'LastCrawl': {'Status': 'MOCK_STATUS'}},
+ ]
+ mock_get_conn.return_value.get_crawler_metrics.return_value = {
+ 'CrawlerMetricsList': [
+ {
+ 'LastRuntimeSeconds': 'TEST-A',
+ 'MedianRuntimeSeconds': 'TEST-B',
+ 'TablesCreated': 'TEST-C',
+ 'TablesUpdated': 'TEST-D',
+ 'TablesDeleted': 'TEST-E',
+ }
+ ]
+ }
+ result = self.hook.wait_for_crawler_completion(mock_crawler_name)
+ self.assertEqual(result, 'MOCK_STATUS')
+ mock_get_conn.assert_has_calls(
+ [
+ mock.call(),
+ mock.call().get_crawler_metrics(CrawlerNameList=[mock_crawler_name]),
+ ]
+ )
+ mock_get_crawler.assert_has_calls(
+ [
+ mock.call(mock_crawler_name),
+ ]
+ )
+
+ @mock.patch.object(AwsGlueCrawlerHook, "get_conn")
+ @mock.patch.object(AwsGlueCrawlerHook, "get_crawler")
+ @mock.patch('airflow.providers.amazon.aws.hooks.glue_crawler.sleep')
+ def test_wait_for_crawler_completion_retry_two_times(self, mock_sleep, mock_get_crawler, mock_get_conn):
+ mock_get_crawler.side_effect = [
+ {'State': 'RUNNING'},
+ {'State': 'READY', 'LastCrawl': {'Status': 'MOCK_STATUS'}},
+ ]
+ mock_get_conn.return_value.get_crawler_metrics.side_effect = [
+ {'CrawlerMetricsList': [{'TimeLeftSeconds': 12}]},
+ {
+ 'CrawlerMetricsList': [
+ {
+ 'LastRuntimeSeconds': 'TEST-A',
+ 'MedianRuntimeSeconds': 'TEST-B',
+ 'TablesCreated': 'TEST-C',
+ 'TablesUpdated': 'TEST-D',
+ 'TablesDeleted': 'TEST-E',
+ }
+ ]
+ },
+ ]
+ result = self.hook.wait_for_crawler_completion(mock_crawler_name)
+ self.assertEqual(result, 'MOCK_STATUS')
+ mock_get_conn.assert_has_calls(
+ [
+ mock.call(),
+ mock.call().get_crawler_metrics(CrawlerNameList=[mock_crawler_name]),
+ ]
+ )
+ mock_get_crawler.assert_has_calls(
+ [
+ mock.call(mock_crawler_name),
+ mock.call(mock_crawler_name),
+ ]
+ )
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/tests/providers/amazon/aws/operators/test_glue_crawler.py b/tests/providers/amazon/aws/operators/test_glue_crawler.py
new file mode 100644
index 0000000..be10ef4
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_glue_crawler.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+from airflow.providers.amazon.aws.operators.glue_crawler import AwsGlueCrawlerOperator
+
+mock_crawler_name = 'test-crawler'
+mock_role_name = 'test-role'
+mock_config = {
+ 'Name': mock_crawler_name,
+ 'Description': 'Test glue crawler from Airflow',
+ 'DatabaseName': 'test_db',
+ 'Role': mock_role_name,
+ 'Targets': {
+ 'S3Targets': [
+ {
+ 'Path': 's3://test-glue-crawler/foo/',
+ 'Exclusions': [
+ 's3://test-glue-crawler/bar/',
+ ],
+ 'ConnectionName': 'test-s3-conn',
+ }
+ ],
+ 'JdbcTargets': [
+ {
+ 'ConnectionName': 'test-jdbc-conn',
+ 'Path': 'test_db/test_table>',
+ 'Exclusions': [
+ 'string',
+ ],
+ }
+ ],
+ 'MongoDBTargets': [
+ {'ConnectionName': 'test-mongo-conn', 'Path': 'test_db/test_collection', 'ScanAll': True}
+ ],
+ 'DynamoDBTargets': [{'Path': 'test_db/test_table', 'scanAll': True, 'scanRate': 123.0}],
+ 'CatalogTargets': [
+ {
+ 'DatabaseName': 'test_glue_db',
+ 'Tables': [
+ 'test',
+ ],
+ }
+ ],
+ },
+ 'Classifiers': ['test-classifier'],
+ 'TablePrefix': 'test',
+ 'SchemaChangePolicy': {
+ 'UpdateBehavior': 'UPDATE_IN_DATABASE',
+ 'DeleteBehavior': 'DEPRECATE_IN_DATABASE',
+ },
+ 'RecrawlPolicy': {'RecrawlBehavior': 'CRAWL_EVERYTHING'},
+ 'LineageConfiguration': 'ENABLE',
+ 'Configuration': """
+ {
+ "Version": 1.0,
+ "CrawlerOutput": {
+ "Partitions": { "AddOrUpdateBehavior": "InheritFromTable" }
+ }
+ }
+ """,
+ 'SecurityConfiguration': 'test',
+ 'Tags': {'test': 'foo'},
+}
+
+
+class TestAwsGlueCrawlerOperator(unittest.TestCase):
+ def setUp(self):
+ self.glue = AwsGlueCrawlerOperator(task_id='test_glue_crawler_operator', config=mock_config)
+
+ @mock.patch('airflow.providers.amazon.aws.operators.glue_crawler.AwsGlueCrawlerHook')
+ def test_execute_without_failure(self, mock_hook):
+ mock_hook.return_value.has_crawler.return_value = True
+ self.glue.execute(None)
+
+ mock_hook.assert_has_calls(
+ [
+ mock.call('aws_default'),
+ mock.call().has_crawler('test-crawler'),
+ mock.call().update_crawler(**mock_config),
+ mock.call().start_crawler(mock_crawler_name),
+ mock.call().wait_for_crawler_completion(crawler_name=mock_crawler_name, poll_interval=5),
+ ]
+ )
diff --git a/tests/providers/amazon/aws/sensors/test_glue_crawler.py b/tests/providers/amazon/aws/sensors/test_glue_crawler.py
new file mode 100644
index 0000000..749edfe
--- /dev/null
+++ b/tests/providers/amazon/aws/sensors/test_glue_crawler.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+from airflow.providers.amazon.aws.sensors.glue_crawler import AwsGlueCrawlerSensor
+
+
+class TestAwsGlueCrawlerSensor(unittest.TestCase):
+ def setUp(self):
+ self.sensor = AwsGlueCrawlerSensor(
+ task_id='test_glue_crawler_sensor',
+ crawler_name='aws_test_glue_crawler',
+ poke_interval=1,
+ timeout=5,
+ aws_conn_id='aws_default',
+ )
+
+ @mock.patch.object(AwsGlueCrawlerHook, 'get_crawler')
+ def test_poke_success(self, mock_get_crawler):
+ mock_get_crawler.return_value['LastCrawl']['Status'] = "SUCCEEDED"
+ self.assertFalse(self.sensor.poke(None))
+ mock_get_crawler.assert_called_once_with('aws_test_glue_crawler')
+
+ @mock.patch.object(AwsGlueCrawlerHook, 'get_crawler')
+ def test_poke_failed(self, mock_get_crawler):
+ mock_get_crawler.return_value['LastCrawl']['Status'] = "FAILED"
+ self.assertFalse(self.sensor.poke(None))
+ mock_get_crawler.assert_called_once_with('aws_test_glue_crawler')
+
+ @mock.patch.object(AwsGlueCrawlerHook, 'get_crawler')
+ def test_poke_cancelled(self, mock_get_crawler):
+ mock_get_crawler.return_value['LastCrawl']['Status'] = "CANCELLED"
+ self.assertFalse(self.sensor.poke(None))
+ mock_get_crawler.assert_called_once_with('aws_test_glue_crawler')
+
+
+if __name__ == '__main__':
+ unittest.main()