You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/12/30 20:28:46 UTC

[GitHub] [airflow] mschmo commented on a change in pull request #13072: AWS Glue Crawler Integration

mschmo commented on a change in pull request #13072:
URL: https://github.com/apache/airflow/pull/13072#discussion_r550312524



##########
File path: airflow/providers/amazon/aws/hooks/glue_crawler.py
##########
@@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+    """
+    Interacts with AWS Glue Crawler
+    :param poll_interval: Time (in seconds) to wait between two consecutive calls to check crawler status
+    :type poll_interval: int
+    :param config = Configurations for the AWS Glue crawler
+    :type config = dict
+    """
+
+    def __init__(self, poll_interval, *args, **kwargs):
+        self.poll_interval = poll_interval

Review comment:
       IMO `poll_interval` shouldn't be a required argument for the hook. It makes sense as a required argument for the sensor, but the hook I think should be a bit more vague in how it can interact with the Glue API. For instance `poll_interval` is irrelevant if I want to initialize a hook solely to get or create a crawler, and not poll for its status.
   
   I believe it may be better to pass `poll_interval` in as an argument to the `wait_for_crawler_completion` method, which is the only place it's used atm.

##########
File path: airflow/providers/amazon/aws/sensors/glue_crawler.py
##########
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AwsGlueCrawlerSensor(BaseSensorOperator):
+    """
+    Waits for an AWS Glue crawler to reach any of the statuses below
+    'FAILED', 'CANCELLED', 'SUCCEEDED'

Review comment:
       FYI after the initial run the crawler's last crawl status will **always** be one of these statuses. Do you think it would make more sense to first check that the crawler is in a "READY" state. I feel like this sensor would be much more useful if it behaved similar to the hook's `wait_for_crawler_completion` and confirmed that the crawler was not currently running.

##########
File path: airflow/providers/amazon/aws/sensors/glue_crawler.py
##########
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.glue_crawler import AwsGlueCrawlerHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AwsGlueCrawlerSensor(BaseSensorOperator):
+    """
+    Waits for an AWS Glue crawler to reach any of the statuses below
+    'FAILED', 'CANCELLED', 'SUCCEEDED'
+    :param crawler_name: The AWS Glue crawler unique name
+    :type crawler_name: str
+    """
+
+    @apply_defaults
+    def __init__(self, *, crawler_name: str, aws_conn_id: str = 'aws_default', **kwargs):
+        super().__init__(**kwargs)
+        self.crawler_name = crawler_name
+        self.aws_conn_id = aws_conn_id
+        self.success_statuses = 'SUCCEEDED'
+        self.errored_statuses = ('FAILED', 'CANCELLED')
+
+    def poke(self, context):
+        hook = AwsGlueCrawlerHook(aws_conn_id=self.aws_conn_id, poll_interval=5)

Review comment:
       More reason IMO to remove `poll_interval` from the hook initialization. This is somewhat confusing because it makes it seem like we're hard-coding a 5 second poll interval, but really we're using the base sensor's `poll_interval` attribute between polls.

##########
File path: airflow/providers/amazon/aws/hooks/glue_crawler.py
##########
@@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from time import sleep
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class AwsGlueCrawlerHook(AwsBaseHook):
+    """
+    Interacts with AWS Glue Crawler
+    :param poll_interval: Time (in seconds) to wait between two consecutive calls to check crawler status
+    :type poll_interval: int
+    :param config = Configurations for the AWS Glue crawler
+    :type config = dict
+    """
+
+    def __init__(self, poll_interval, *args, **kwargs):
+        self.poll_interval = poll_interval
+        kwargs['client_type'] = 'glue'
+        super().__init__(*args, **kwargs)
+
+    @cached_property
+    def glue_client(self):
+        """:return: AWS Glue client"""
+        return self.get_conn()
+
+    def get_iam_execution_role(self, role_name) -> str:
+        """:return: iam role for crawler execution"""
+        iam_client = self.get_client_type('iam', self.region_name)
+
+        iam_client.get_role(RoleName=role_name)
+        return role_name
+
+    def get_or_create_crawler(self, config) -> str:
+        """
+        Creates the crawler if the crawler doesn't exists and returns the crawler name
+
+        :param config = Configurations for the AWS Glue crawler
+        :type config = dict
+        :return: Name of the crawler
+        """
+        crawler_name = config["Name"]
+        try:
+            self.glue_client.get_crawler(Name=crawler_name)
+            self.log.info("Crawler %s already exists; updating crawler", crawler_name)
+            self.glue_client.update_crawler(**config)
+        except self.glue_client.exceptions.EntityNotFoundException:
+            self.get_iam_execution_role(config["Role"])
+            self.log.info("Creating AWS Glue crawler %s", crawler_name)
+            self.glue_client.create_crawler(**config)
+        return crawler_name
+
+    def start_crawler(self, crawler_name: str) -> str:
+        """
+        Triggers the AWS Glue crawler
+        :return: Empty dictionary
+        """
+        crawler = self.glue_client.start_crawler(Name=crawler_name)
+        return crawler
+
+    def get_crawler_state(self, crawler_name: str) -> str:
+        """
+        Get state of the Glue crawler. The crawler state can be
+        ready, running, or stopping.
+        :param crawler_name: unique crawler name per AWS account
+        :type crawler_name: str
+        :return: State of the Glue crawler
+        """
+        crawler = self.glue_client.get_crawler(Name=crawler_name)
+        crawler_state = crawler['Crawler']['State']
+        return crawler_state
+
+    def get_last_crawl_status(self, crawler_name: str) -> str:
+        """
+        Get the status of the latest crawl run. The crawl
+        status can be succeeded, cancelled, or failed.
+        :param crawler_name: unique crawler name per AWS account
+        :type crawler_name: str
+        :return: Status of the Glue crawler
+        """
+        crawler = self.glue_client.get_crawler(Name=crawler_name)
+        last_crawl_status = crawler['Crawler']['LastCrawl']['Status']
+        return last_crawl_status
+
+    def wait_for_crawler_completion(self, crawler_name: str) -> str:
+        """
+        Waits until Glue crawler completes or
+        fails and returns the final state if finished.
+        Raises AirflowException when the crawler failed
+        :param crawler_name: unique crawler name per AWS account
+        :type crawler_name: str
+        :return: Dict of crawler's status
+        """
+        failed_status = ['FAILED', 'CANCELLED']
+
+        while True:
+            crawler_state = self.get_crawler_state(crawler_name)
+            if crawler_state == 'READY':
+                self.log.info("State: %s", crawler_state)
+                crawler_status = self.get_last_crawl_status(crawler_name)
+                if crawler_status in failed_status:
+                    raise AirflowException(
+                        f"Status: {crawler_status}"
+                    )  # pylint: disable=raising-format-tuple
+                else:
+                    metrics = self.get_crawler_metrics(crawler_name)
+                    self.log.info("Status: %s", crawler_status)
+                    self.log.info("Last Runtime Duration (seconds): %s", metrics['LastRuntimeSeconds'])
+                    self.log.info("Median Runtime Duration (seconds): %s", metrics['MedianRuntimeSeconds'])
+                    self.log.info("Tables Created: %s", metrics['TablesCreated'])
+                    self.log.info("Tables Updated: %s", metrics['TablesUpdated'])
+                    self.log.info("Tables Deleted: %s", metrics['TablesDeleted'])
+
+                    return crawler_status
+
+            else:
+                self.log.info("Polling for AWS Glue crawler: %s ", crawler_name)
+                self.log.info("State: %s", crawler_state)
+
+                sleep(self.poll_interval)
+
+                metrics = self.get_crawler_metrics(crawler_name)
+                time_left = int(metrics['TimeLeftSeconds'])
+
+                if time_left > 0:
+                    print('Estimated Time Left (seconds): ', time_left)
+                    self.poll_interval = time_left
+                else:
+                    print('Crawler should finish soon')

Review comment:
       Should be using `self.log` instead of `print`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org