You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/19 07:00:38 UTC

[airflow] branch main updated: Glue Job Driver logging (#25142)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a77c46bf0 Glue Job Driver logging (#25142)
5a77c46bf0 is described below

commit 5a77c46bf0ee9d154467147d5d7e976361b8ee27
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Tue Jul 19 07:00:30 2022 +0000

    Glue Job Driver logging (#25142)
    
    Adds an optional `verbose` boolean to Glue job operators and sensors which defaults to False.  If set true, then Glue job logs will be passed through to the Airflow task logs.
---
 airflow/providers/amazon/aws/hooks/glue.py        | 97 +++++++++++++++++++----
 airflow/providers/amazon/aws/operators/glue.py    |  5 +-
 airflow/providers/amazon/aws/sensors/glue.py      | 50 +++++++++---
 tests/providers/amazon/aws/operators/test_glue.py | 54 ++++++++++++-
 tests/providers/amazon/aws/sensors/test_glue.py   | 97 +++++++++++++++++++++--
 5 files changed, 265 insertions(+), 38 deletions(-)

diff --git a/airflow/providers/amazon/aws/hooks/glue.py b/airflow/providers/amazon/aws/hooks/glue.py
index 2fc342675d..9201b9f70b 100644
--- a/airflow/providers/amazon/aws/hooks/glue.py
+++ b/airflow/providers/amazon/aws/hooks/glue.py
@@ -20,9 +20,18 @@ import time
 import warnings
 from typing import Dict, List, Optional
 
+import boto3
+
 from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
 
+DEFAULT_LOG_SUFFIX = 'output'
+FAILURE_LOG_SUFFIX = 'error'
+# A filter value of ' ' translates to "match all".
+# see: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
+DEFAULT_LOG_FILTER = ' '
+FAILURE_LOG_FILTER = '?ERROR ?Exception'
+
 
 class GlueJobHook(AwsBaseHook):
     """
@@ -136,32 +145,92 @@ class GlueJobHook(AwsBaseHook):
         job_run = glue_client.get_job_run(JobName=job_name, RunId=run_id, PredecessorsIncluded=True)
         return job_run['JobRun']['JobRunState']
 
-    def job_completion(self, job_name: str, run_id: str) -> Dict[str, str]:
+    def print_job_logs(
+        self,
+        job_name: str,
+        run_id: str,
+        job_failed: bool = False,
+        next_token: Optional[str] = None,
+    ) -> Optional[str]:
+        """Prints the batch of logs to the Airflow task log and returns nextToken."""
+        log_client = boto3.client('logs')
+        response = {}
+
+        filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+        log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)['JobRun']['LogGroupName']
+        log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX
+        log_group_name = f'{log_group_prefix}/{log_group_suffix}'
+
+        try:
+            if next_token:
+                response = log_client.filter_log_events(
+                    logGroupName=log_group_name,
+                    logStreamNames=[run_id],
+                    filterPattern=filter_pattern,
+                    nextToken=next_token,
+                )
+            else:
+                response = log_client.filter_log_events(
+                    logGroupName=log_group_name,
+                    logStreamNames=[run_id],
+                    filterPattern=filter_pattern,
+                )
+            if len(response['events']):
+                messages = '\t'.join([event['message'] for event in response['events']])
+                self.log.info('Glue Job Run Logs:\n\t%s', messages)
+
+        except log_client.exceptions.ResourceNotFoundException:
+            self.log.warning(
+                'No new Glue driver logs found. This might be because there are no new logs, '
+                'or might be an error.\nIf the error persists, check the CloudWatch dashboard '
+                f'at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home'
+            )
+
+        # If no new log events are available, filter_log_events will return None.
+        # In that case, check the same token again next pass.
+        return response.get('nextToken') or next_token
+
+    def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> Dict[str, str]:
         """
         Waits until Glue job with job_name completes or
         fails and return final state if finished.
         Raises AirflowException when the job failed
         :param job_name: unique job name per AWS account
         :param run_id: The job-run ID of the predecessor job run
+        :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs.  (default: False)
         :return: Dict of JobRunState and JobRunId
         """
         failed_states = ['FAILED', 'TIMEOUT']
         finished_states = ['SUCCEEDED', 'STOPPED']
+        next_log_token = None
+        job_failed = False
 
         while True:
-            job_run_state = self.get_job_state(job_name, run_id)
-            if job_run_state in finished_states:
-                self.log.info("Exiting Job %s Run State: %s", run_id, job_run_state)
-                return {'JobRunState': job_run_state, 'JobRunId': run_id}
-            if job_run_state in failed_states:
-                job_error_message = f"Exiting Job {run_id} Run State: {job_run_state}"
-                self.log.info(job_error_message)
-                raise AirflowException(job_error_message)
-            else:
-                self.log.info(
-                    "Polling for AWS Glue Job %s current run state with status %s", job_name, job_run_state
-                )
-                time.sleep(self.JOB_POLL_INTERVAL)
+            try:
+                job_run_state = self.get_job_state(job_name, run_id)
+                if job_run_state in finished_states:
+                    self.log.info('Exiting Job %s Run State: %s', run_id, job_run_state)
+                    return {'JobRunState': job_run_state, 'JobRunId': run_id}
+                if job_run_state in failed_states:
+                    job_failed = True
+                    job_error_message = f'Exiting Job {run_id} Run State: {job_run_state}'
+                    self.log.info(job_error_message)
+                    raise AirflowException(job_error_message)
+                else:
+                    self.log.info(
+                        'Polling for AWS Glue Job %s current run state with status %s',
+                        job_name,
+                        job_run_state,
+                    )
+                    time.sleep(self.JOB_POLL_INTERVAL)
+            finally:
+                if verbose:
+                    next_log_token = self.print_job_logs(
+                        job_name=job_name,
+                        run_id=run_id,
+                        job_failed=job_failed,
+                        next_token=next_log_token,
+                    )
 
     def get_or_create_glue_job(self) -> str:
         """
diff --git a/airflow/providers/amazon/aws/operators/glue.py b/airflow/providers/amazon/aws/operators/glue.py
index 4d42b0b440..424cd75f36 100644
--- a/airflow/providers/amazon/aws/operators/glue.py
+++ b/airflow/providers/amazon/aws/operators/glue.py
@@ -51,6 +51,7 @@ class GlueJobOperator(BaseOperator):
     :param create_job_kwargs: Extra arguments for Glue Job Creation
     :param run_job_kwargs: Extra arguments for Glue Job Run
     :param wait_for_completion: Whether or not wait for job run completion. (default: True)
+    :param verbose: If True, Glue Job Run logs show in the Airflow Task Logs.  (default: False)
     """
 
     template_fields: Sequence[str] = (
@@ -84,6 +85,7 @@ class GlueJobOperator(BaseOperator):
         create_job_kwargs: Optional[dict] = None,
         run_job_kwargs: Optional[dict] = None,
         wait_for_completion: bool = True,
+        verbose: bool = False,
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -103,6 +105,7 @@ class GlueJobOperator(BaseOperator):
         self.create_job_kwargs = create_job_kwargs
         self.run_job_kwargs = run_job_kwargs or {}
         self.wait_for_completion = wait_for_completion
+        self.verbose = verbose
 
     def execute(self, context: 'Context'):
         """
@@ -141,7 +144,7 @@ class GlueJobOperator(BaseOperator):
         )
         glue_job_run = glue_job.initialize_job(self.script_args, self.run_job_kwargs)
         if self.wait_for_completion:
-            glue_job_run = glue_job.job_completion(self.job_name, glue_job_run['JobRunId'])
+            glue_job_run = glue_job.job_completion(self.job_name, glue_job_run['JobRunId'], self.verbose)
             self.log.info(
                 "AWS Glue Job: %s status: %s. Run Id: %s",
                 self.job_name,
diff --git a/airflow/providers/amazon/aws/sensors/glue.py b/airflow/providers/amazon/aws/sensors/glue.py
index 525e7b8ee6..a7989f64ec 100644
--- a/airflow/providers/amazon/aws/sensors/glue.py
+++ b/airflow/providers/amazon/aws/sensors/glue.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import warnings
-from typing import TYPE_CHECKING, Sequence
+from typing import TYPE_CHECKING, List, Optional, Sequence
 
 from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
@@ -37,30 +37,54 @@ class GlueJobSensor(BaseSensorOperator):
 
     :param job_name: The AWS Glue Job unique name
     :param run_id: The AWS Glue current running job identifier
+    :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs.  (default: False)
     """
 
     template_fields: Sequence[str] = ('job_name', 'run_id')
 
-    def __init__(self, *, job_name: str, run_id: str, aws_conn_id: str = 'aws_default', **kwargs):
+    def __init__(
+        self,
+        *,
+        job_name: str,
+        run_id: str,
+        verbose: bool = False,
+        aws_conn_id: str = 'aws_default',
+        **kwargs,
+    ):
         super().__init__(**kwargs)
         self.job_name = job_name
         self.run_id = run_id
+        self.verbose = verbose
         self.aws_conn_id = aws_conn_id
-        self.success_states = ['SUCCEEDED']
-        self.errored_states = ['FAILED', 'STOPPED', 'TIMEOUT']
+        self.success_states: List[str] = ['SUCCEEDED']
+        self.errored_states: List[str] = ['FAILED', 'STOPPED', 'TIMEOUT']
+        self.next_log_token: Optional[str] = None
 
     def poke(self, context: 'Context'):
         hook = GlueJobHook(aws_conn_id=self.aws_conn_id)
-        self.log.info("Poking for job run status :for Glue Job %s and ID %s", self.job_name, self.run_id)
+        self.log.info('Poking for job run status :for Glue Job %s and ID %s', self.job_name, self.run_id)
         job_state = hook.get_job_state(job_name=self.job_name, run_id=self.run_id)
-        if job_state in self.success_states:
-            self.log.info("Exiting Job %s Run State: %s", self.run_id, job_state)
-            return True
-        elif job_state in self.errored_states:
-            job_error_message = f"Exiting Job {self.run_id} Run State: {job_state}"
-            raise AirflowException(job_error_message)
-        else:
-            return False
+        job_failed = False
+
+        try:
+            if job_state in self.success_states:
+                self.log.info('Exiting Job %s Run State: %s', self.run_id, job_state)
+                return True
+            elif job_state in self.errored_states:
+                job_failed = True
+                job_error_message = 'Exiting Job %s Run State: %s', self.run_id, job_state
+                self.log.info(job_error_message)
+                raise AirflowException(job_error_message)
+            else:
+                return False
+        finally:
+            if self.verbose:
+                self.next_log_token = hook.print_job_logs(
+                    job_name=self.job_name,
+                    run_id=self.run_id,
+                    job_failed=job_failed,
+                    next_token=self.next_log_token,
+                )
 
 
 class AwsGlueJobSensor(GlueJobSensor):
diff --git a/tests/providers/amazon/aws/operators/test_glue.py b/tests/providers/amazon/aws/operators/test_glue.py
index be60057392..0dc99a796e 100644
--- a/tests/providers/amazon/aws/operators/test_glue.py
+++ b/tests/providers/amazon/aws/operators/test_glue.py
@@ -20,7 +20,7 @@ from unittest import mock
 
 from parameterized import parameterized
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
@@ -29,7 +29,7 @@ from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
 class TestGlueJobOperator(unittest.TestCase):
     @mock.patch('airflow.providers.amazon.aws.hooks.glue.GlueJobHook')
     def setUp(self, glue_hook_mock):
-        configuration.load_test_config()
+        conf.load_test_config()
 
         self.glue_hook_mock = glue_hook_mock
 
@@ -39,12 +39,19 @@ class TestGlueJobOperator(unittest.TestCase):
             "/glue-examples/glue-scripts/sample_aws_glue_job.py",
         ]
     )
+    @mock.patch.object(GlueJobHook, 'print_job_logs')
     @mock.patch.object(GlueJobHook, 'get_job_state')
     @mock.patch.object(GlueJobHook, 'initialize_job')
     @mock.patch.object(GlueJobHook, "get_conn")
     @mock.patch.object(S3Hook, "load_file")
     def test_execute_without_failure(
-        self, script_location, mock_load_file, mock_get_conn, mock_initialize_job, mock_get_job_state
+        self,
+        script_location,
+        mock_load_file,
+        mock_get_conn,
+        mock_initialize_job,
+        mock_get_job_state,
+        mock_print_job_logs,
     ):
         glue = GlueJobOperator(
             task_id='test_glue_operator',
@@ -57,16 +64,52 @@ class TestGlueJobOperator(unittest.TestCase):
         )
         mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': '11111'}
         mock_get_job_state.return_value = 'SUCCEEDED'
+
         glue.execute({})
+
         mock_initialize_job.assert_called_once_with({}, {})
+        mock_print_job_logs.assert_not_called()
         assert glue.job_name == 'my_test_job'
 
+    @mock.patch.object(GlueJobHook, 'print_job_logs')
+    @mock.patch.object(GlueJobHook, 'get_job_state')
+    @mock.patch.object(GlueJobHook, 'initialize_job')
+    @mock.patch.object(GlueJobHook, "get_conn")
+    @mock.patch.object(S3Hook, "load_file")
+    def test_execute_with_verbose_logging(
+        self, mock_load_file, mock_get_conn, mock_initialize_job, mock_get_job_state, mock_print_job_logs
+    ):
+        job_name = 'test_job_name'
+        job_run_id = '11111'
+        glue = GlueJobOperator(
+            task_id='test_glue_operator',
+            job_name=job_name,
+            script_location='s3_uri',
+            s3_bucket='bucket_name',
+            iam_role_name='role_arn',
+            verbose=True,
+        )
+        mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': job_run_id}
+        mock_get_job_state.return_value = 'SUCCEEDED'
+
+        glue.execute({})
+
+        mock_initialize_job.assert_called_once_with({}, {})
+        mock_print_job_logs.assert_called_once_with(
+            job_name=job_name,
+            run_id=job_run_id,
+            job_failed=False,
+            next_token=None,
+        )
+        assert glue.job_name == job_name
+
+    @mock.patch.object(GlueJobHook, 'print_job_logs')
     @mock.patch.object(GlueJobHook, 'job_completion')
     @mock.patch.object(GlueJobHook, 'initialize_job')
     @mock.patch.object(GlueJobHook, "get_conn")
     @mock.patch.object(S3Hook, "load_file")
     def test_execute_without_waiting_for_completion(
-        self, mock_load_file, mock_get_conn, mock_initialize_job, mock_job_completion
+        self, mock_load_file, mock_get_conn, mock_initialize_job, mock_job_completion, mock_print_job_logs
     ):
         glue = GlueJobOperator(
             task_id='test_glue_operator',
@@ -79,8 +122,11 @@ class TestGlueJobOperator(unittest.TestCase):
             wait_for_completion=False,
         )
         mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': '11111'}
+
         job_run_id = glue.execute({})
+
         mock_initialize_job.assert_called_once_with({}, {})
         mock_job_completion.assert_not_called()
+        mock_print_job_logs.assert_not_called()
         assert glue.job_name == 'my_test_job'
         assert job_run_id == '11111'
diff --git a/tests/providers/amazon/aws/sensors/test_glue.py b/tests/providers/amazon/aws/sensors/test_glue.py
index 672c612353..8fb96ac946 100644
--- a/tests/providers/amazon/aws/sensors/test_glue.py
+++ b/tests/providers/amazon/aws/sensors/test_glue.py
@@ -17,19 +17,24 @@
 
 import unittest
 from unittest import mock
+from unittest.mock import ANY
 
-from airflow import configuration
+import pytest
+
+from airflow import AirflowException
+from airflow.configuration import conf
 from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
 from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
 
 
 class TestGlueJobSensor(unittest.TestCase):
     def setUp(self):
-        configuration.load_test_config()
+        conf.load_test_config()
 
+    @mock.patch.object(GlueJobHook, 'print_job_logs')
     @mock.patch.object(GlueJobHook, 'get_conn')
     @mock.patch.object(GlueJobHook, 'get_job_state')
-    def test_poke(self, mock_get_job_state, mock_conn):
+    def test_poke(self, mock_get_job_state, mock_conn, mock_print_job_logs):
         mock_conn.return_value.get_job_run()
         mock_get_job_state.return_value = 'SUCCEEDED'
         op = GlueJobSensor(
@@ -38,13 +43,40 @@ class TestGlueJobSensor(unittest.TestCase):
             run_id='5152fgsfsjhsh61661',
             poke_interval=1,
             timeout=5,
-            aws_conn_id='aws_default',
         )
+
+        assert op.poke({})
+        mock_print_job_logs.assert_not_called()
+
+    @mock.patch.object(GlueJobHook, 'print_job_logs')
+    @mock.patch.object(GlueJobHook, 'get_conn')
+    @mock.patch.object(GlueJobHook, 'get_job_state')
+    def test_poke_with_verbose_logging(self, mock_get_job_state, mock_conn, mock_print_job_logs):
+        mock_conn.return_value.get_job_run()
+        mock_get_job_state.return_value = 'SUCCEEDED'
+        job_name = 'job_name'
+        job_run_id = 'job_run_id'
+        op = GlueJobSensor(
+            task_id='test_glue_job_sensor',
+            job_name=job_name,
+            run_id=job_run_id,
+            poke_interval=1,
+            timeout=5,
+            verbose=True,
+        )
+
         assert op.poke({})
+        mock_print_job_logs.assert_called_once_with(
+            job_name=job_name,
+            run_id=job_run_id,
+            job_failed=False,
+            next_token=ANY,
+        )
 
+    @mock.patch.object(GlueJobHook, 'print_job_logs')
     @mock.patch.object(GlueJobHook, 'get_conn')
     @mock.patch.object(GlueJobHook, 'get_job_state')
-    def test_poke_false(self, mock_get_job_state, mock_conn):
+    def test_poke_false(self, mock_get_job_state, mock_conn, mock_print_job_logs):
         mock_conn.return_value.get_job_run()
         mock_get_job_state.return_value = 'RUNNING'
         op = GlueJobSensor(
@@ -53,9 +85,62 @@ class TestGlueJobSensor(unittest.TestCase):
             run_id='5152fgsfsjhsh61661',
             poke_interval=1,
             timeout=5,
-            aws_conn_id='aws_default',
         )
+
         assert not op.poke({})
+        mock_print_job_logs.assert_not_called()
+
+    @mock.patch.object(GlueJobHook, 'print_job_logs')
+    @mock.patch.object(GlueJobHook, 'get_conn')
+    @mock.patch.object(GlueJobHook, 'get_job_state')
+    def test_poke_false_with_verbose_logging(self, mock_get_job_state, mock_conn, mock_print_job_logs):
+        mock_conn.return_value.get_job_run()
+        mock_get_job_state.return_value = 'RUNNING'
+        job_name = 'job_name'
+        job_run_id = 'job_run_id'
+        op = GlueJobSensor(
+            task_id='test_glue_job_sensor',
+            job_name=job_name,
+            run_id=job_run_id,
+            poke_interval=1,
+            timeout=5,
+            verbose=True,
+        )
+
+        assert not op.poke({})
+        mock_print_job_logs.assert_called_once_with(
+            job_name=job_name,
+            run_id=job_run_id,
+            job_failed=False,
+            next_token=ANY,
+        )
+
+    @mock.patch.object(GlueJobHook, 'print_job_logs')
+    @mock.patch.object(GlueJobHook, 'get_conn')
+    @mock.patch.object(GlueJobHook, 'get_job_state')
+    def test_poke_failed_job_with_verbose_logging(self, mock_get_job_state, mock_conn, mock_print_job_logs):
+        mock_conn.return_value.get_job_run()
+        mock_get_job_state.return_value = 'FAILED'
+        job_name = 'job_name'
+        job_run_id = 'job_run_id'
+        op = GlueJobSensor(
+            task_id='test_glue_job_sensor',
+            job_name=job_name,
+            run_id=job_run_id,
+            poke_interval=1,
+            timeout=5,
+            verbose=True,
+        )
+
+        with pytest.raises(AirflowException):
+            assert not op.poke({})
+            mock_print_job_logs.assert_called_once_with(
+                job_name=job_name,
+                run_id=job_run_id,
+                log_group_suffix='error',
+                filter_pattern='?ERROR ?Exception',
+                next_token=ANY,
+            )
 
 
 if __name__ == '__main__':