You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/19 07:00:38 UTC
[airflow] branch main updated: Glue Job Driver logging (#25142)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5a77c46bf0 Glue Job Driver logging (#25142)
5a77c46bf0 is described below
commit 5a77c46bf0ee9d154467147d5d7e976361b8ee27
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Tue Jul 19 07:00:30 2022 +0000
Glue Job Driver logging (#25142)
Adds an optional `verbose` boolean to Glue job operators and sensors which defaults to False. If set true, then Glue job logs will be passed through to the Airflow task logs.
---
airflow/providers/amazon/aws/hooks/glue.py | 97 +++++++++++++++++++----
airflow/providers/amazon/aws/operators/glue.py | 5 +-
airflow/providers/amazon/aws/sensors/glue.py | 50 +++++++++---
tests/providers/amazon/aws/operators/test_glue.py | 54 ++++++++++++-
tests/providers/amazon/aws/sensors/test_glue.py | 97 +++++++++++++++++++++--
5 files changed, 265 insertions(+), 38 deletions(-)
diff --git a/airflow/providers/amazon/aws/hooks/glue.py b/airflow/providers/amazon/aws/hooks/glue.py
index 2fc342675d..9201b9f70b 100644
--- a/airflow/providers/amazon/aws/hooks/glue.py
+++ b/airflow/providers/amazon/aws/hooks/glue.py
@@ -20,9 +20,18 @@ import time
import warnings
from typing import Dict, List, Optional
+import boto3
+
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+DEFAULT_LOG_SUFFIX = 'output'
+FAILURE_LOG_SUFFIX = 'error'
+# A filter value of ' ' translates to "match all".
+# see: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
+DEFAULT_LOG_FILTER = ' '
+FAILURE_LOG_FILTER = '?ERROR ?Exception'
+
class GlueJobHook(AwsBaseHook):
"""
@@ -136,32 +145,92 @@ class GlueJobHook(AwsBaseHook):
job_run = glue_client.get_job_run(JobName=job_name, RunId=run_id, PredecessorsIncluded=True)
return job_run['JobRun']['JobRunState']
- def job_completion(self, job_name: str, run_id: str) -> Dict[str, str]:
+ def print_job_logs(
+ self,
+ job_name: str,
+ run_id: str,
+ job_failed: bool = False,
+ next_token: Optional[str] = None,
+ ) -> Optional[str]:
+ """Prints the batch of logs to the Airflow task log and returns nextToken."""
+ log_client = boto3.client('logs')
+ response = {}
+
+ filter_pattern = FAILURE_LOG_FILTER if job_failed else DEFAULT_LOG_FILTER
+ log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)['JobRun']['LogGroupName']
+ log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else DEFAULT_LOG_SUFFIX
+ log_group_name = f'{log_group_prefix}/{log_group_suffix}'
+
+ try:
+ if next_token:
+ response = log_client.filter_log_events(
+ logGroupName=log_group_name,
+ logStreamNames=[run_id],
+ filterPattern=filter_pattern,
+ nextToken=next_token,
+ )
+ else:
+ response = log_client.filter_log_events(
+ logGroupName=log_group_name,
+ logStreamNames=[run_id],
+ filterPattern=filter_pattern,
+ )
+ if len(response['events']):
+ messages = '\t'.join([event['message'] for event in response['events']])
+ self.log.info('Glue Job Run Logs:\n\t%s', messages)
+
+ except log_client.exceptions.ResourceNotFoundException:
+ self.log.warning(
+ 'No new Glue driver logs found. This might be because there are no new logs, '
+ 'or might be an error.\nIf the error persists, check the CloudWatch dashboard '
+ f'at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home'
+ )
+
+ # If no new log events are available, filter_log_events will return None.
+ # In that case, check the same token again next pass.
+ return response.get('nextToken') or next_token
+
+ def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> Dict[str, str]:
"""
Waits until Glue job with job_name completes or
fails and return final state if finished.
Raises AirflowException when the job failed
:param job_name: unique job name per AWS account
:param run_id: The job-run ID of the predecessor job run
+ :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs. (default: False)
:return: Dict of JobRunState and JobRunId
"""
failed_states = ['FAILED', 'TIMEOUT']
finished_states = ['SUCCEEDED', 'STOPPED']
+ next_log_token = None
+ job_failed = False
while True:
- job_run_state = self.get_job_state(job_name, run_id)
- if job_run_state in finished_states:
- self.log.info("Exiting Job %s Run State: %s", run_id, job_run_state)
- return {'JobRunState': job_run_state, 'JobRunId': run_id}
- if job_run_state in failed_states:
- job_error_message = f"Exiting Job {run_id} Run State: {job_run_state}"
- self.log.info(job_error_message)
- raise AirflowException(job_error_message)
- else:
- self.log.info(
- "Polling for AWS Glue Job %s current run state with status %s", job_name, job_run_state
- )
- time.sleep(self.JOB_POLL_INTERVAL)
+ try:
+ job_run_state = self.get_job_state(job_name, run_id)
+ if job_run_state in finished_states:
+ self.log.info('Exiting Job %s Run State: %s', run_id, job_run_state)
+ return {'JobRunState': job_run_state, 'JobRunId': run_id}
+ if job_run_state in failed_states:
+ job_failed = True
+ job_error_message = f'Exiting Job {run_id} Run State: {job_run_state}'
+ self.log.info(job_error_message)
+ raise AirflowException(job_error_message)
+ else:
+ self.log.info(
+ 'Polling for AWS Glue Job %s current run state with status %s',
+ job_name,
+ job_run_state,
+ )
+ time.sleep(self.JOB_POLL_INTERVAL)
+ finally:
+ if verbose:
+ next_log_token = self.print_job_logs(
+ job_name=job_name,
+ run_id=run_id,
+ job_failed=job_failed,
+ next_token=next_log_token,
+ )
def get_or_create_glue_job(self) -> str:
"""
diff --git a/airflow/providers/amazon/aws/operators/glue.py b/airflow/providers/amazon/aws/operators/glue.py
index 4d42b0b440..424cd75f36 100644
--- a/airflow/providers/amazon/aws/operators/glue.py
+++ b/airflow/providers/amazon/aws/operators/glue.py
@@ -51,6 +51,7 @@ class GlueJobOperator(BaseOperator):
:param create_job_kwargs: Extra arguments for Glue Job Creation
:param run_job_kwargs: Extra arguments for Glue Job Run
:param wait_for_completion: Whether or not wait for job run completion. (default: True)
+ :param verbose: If True, Glue Job Run logs show in the Airflow Task Logs. (default: False)
"""
template_fields: Sequence[str] = (
@@ -84,6 +85,7 @@ class GlueJobOperator(BaseOperator):
create_job_kwargs: Optional[dict] = None,
run_job_kwargs: Optional[dict] = None,
wait_for_completion: bool = True,
+ verbose: bool = False,
**kwargs,
):
super().__init__(**kwargs)
@@ -103,6 +105,7 @@ class GlueJobOperator(BaseOperator):
self.create_job_kwargs = create_job_kwargs
self.run_job_kwargs = run_job_kwargs or {}
self.wait_for_completion = wait_for_completion
+ self.verbose = verbose
def execute(self, context: 'Context'):
"""
@@ -141,7 +144,7 @@ class GlueJobOperator(BaseOperator):
)
glue_job_run = glue_job.initialize_job(self.script_args, self.run_job_kwargs)
if self.wait_for_completion:
- glue_job_run = glue_job.job_completion(self.job_name, glue_job_run['JobRunId'])
+ glue_job_run = glue_job.job_completion(self.job_name, glue_job_run['JobRunId'], self.verbose)
self.log.info(
"AWS Glue Job: %s status: %s. Run Id: %s",
self.job_name,
diff --git a/airflow/providers/amazon/aws/sensors/glue.py b/airflow/providers/amazon/aws/sensors/glue.py
index 525e7b8ee6..a7989f64ec 100644
--- a/airflow/providers/amazon/aws/sensors/glue.py
+++ b/airflow/providers/amazon/aws/sensors/glue.py
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
import warnings
-from typing import TYPE_CHECKING, Sequence
+from typing import TYPE_CHECKING, List, Optional, Sequence
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
@@ -37,30 +37,54 @@ class GlueJobSensor(BaseSensorOperator):
:param job_name: The AWS Glue Job unique name
:param run_id: The AWS Glue current running job identifier
+ :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs. (default: False)
"""
template_fields: Sequence[str] = ('job_name', 'run_id')
- def __init__(self, *, job_name: str, run_id: str, aws_conn_id: str = 'aws_default', **kwargs):
+ def __init__(
+ self,
+ *,
+ job_name: str,
+ run_id: str,
+ verbose: bool = False,
+ aws_conn_id: str = 'aws_default',
+ **kwargs,
+ ):
super().__init__(**kwargs)
self.job_name = job_name
self.run_id = run_id
+ self.verbose = verbose
self.aws_conn_id = aws_conn_id
- self.success_states = ['SUCCEEDED']
- self.errored_states = ['FAILED', 'STOPPED', 'TIMEOUT']
+ self.success_states: List[str] = ['SUCCEEDED']
+ self.errored_states: List[str] = ['FAILED', 'STOPPED', 'TIMEOUT']
+ self.next_log_token: Optional[str] = None
def poke(self, context: 'Context'):
hook = GlueJobHook(aws_conn_id=self.aws_conn_id)
- self.log.info("Poking for job run status :for Glue Job %s and ID %s", self.job_name, self.run_id)
+ self.log.info('Poking for job run status :for Glue Job %s and ID %s', self.job_name, self.run_id)
job_state = hook.get_job_state(job_name=self.job_name, run_id=self.run_id)
- if job_state in self.success_states:
- self.log.info("Exiting Job %s Run State: %s", self.run_id, job_state)
- return True
- elif job_state in self.errored_states:
- job_error_message = f"Exiting Job {self.run_id} Run State: {job_state}"
- raise AirflowException(job_error_message)
- else:
- return False
+ job_failed = False
+
+ try:
+ if job_state in self.success_states:
+ self.log.info('Exiting Job %s Run State: %s', self.run_id, job_state)
+ return True
+ elif job_state in self.errored_states:
+ job_failed = True
+ job_error_message = 'Exiting Job %s Run State: %s', self.run_id, job_state
+ self.log.info(job_error_message)
+ raise AirflowException(job_error_message)
+ else:
+ return False
+ finally:
+ if self.verbose:
+ self.next_log_token = hook.print_job_logs(
+ job_name=self.job_name,
+ run_id=self.run_id,
+ job_failed=job_failed,
+ next_token=self.next_log_token,
+ )
class AwsGlueJobSensor(GlueJobSensor):
diff --git a/tests/providers/amazon/aws/operators/test_glue.py b/tests/providers/amazon/aws/operators/test_glue.py
index be60057392..0dc99a796e 100644
--- a/tests/providers/amazon/aws/operators/test_glue.py
+++ b/tests/providers/amazon/aws/operators/test_glue.py
@@ -20,7 +20,7 @@ from unittest import mock
from parameterized import parameterized
-from airflow import configuration
+from airflow.configuration import conf
from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
@@ -29,7 +29,7 @@ from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
class TestGlueJobOperator(unittest.TestCase):
@mock.patch('airflow.providers.amazon.aws.hooks.glue.GlueJobHook')
def setUp(self, glue_hook_mock):
- configuration.load_test_config()
+ conf.load_test_config()
self.glue_hook_mock = glue_hook_mock
@@ -39,12 +39,19 @@ class TestGlueJobOperator(unittest.TestCase):
"/glue-examples/glue-scripts/sample_aws_glue_job.py",
]
)
+ @mock.patch.object(GlueJobHook, 'print_job_logs')
@mock.patch.object(GlueJobHook, 'get_job_state')
@mock.patch.object(GlueJobHook, 'initialize_job')
@mock.patch.object(GlueJobHook, "get_conn")
@mock.patch.object(S3Hook, "load_file")
def test_execute_without_failure(
- self, script_location, mock_load_file, mock_get_conn, mock_initialize_job, mock_get_job_state
+ self,
+ script_location,
+ mock_load_file,
+ mock_get_conn,
+ mock_initialize_job,
+ mock_get_job_state,
+ mock_print_job_logs,
):
glue = GlueJobOperator(
task_id='test_glue_operator',
@@ -57,16 +64,52 @@ class TestGlueJobOperator(unittest.TestCase):
)
mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': '11111'}
mock_get_job_state.return_value = 'SUCCEEDED'
+
glue.execute({})
+
mock_initialize_job.assert_called_once_with({}, {})
+ mock_print_job_logs.assert_not_called()
assert glue.job_name == 'my_test_job'
+ @mock.patch.object(GlueJobHook, 'print_job_logs')
+ @mock.patch.object(GlueJobHook, 'get_job_state')
+ @mock.patch.object(GlueJobHook, 'initialize_job')
+ @mock.patch.object(GlueJobHook, "get_conn")
+ @mock.patch.object(S3Hook, "load_file")
+ def test_execute_with_verbose_logging(
+ self, mock_load_file, mock_get_conn, mock_initialize_job, mock_get_job_state, mock_print_job_logs
+ ):
+ job_name = 'test_job_name'
+ job_run_id = '11111'
+ glue = GlueJobOperator(
+ task_id='test_glue_operator',
+ job_name=job_name,
+ script_location='s3_uri',
+ s3_bucket='bucket_name',
+ iam_role_name='role_arn',
+ verbose=True,
+ )
+ mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': job_run_id}
+ mock_get_job_state.return_value = 'SUCCEEDED'
+
+ glue.execute({})
+
+ mock_initialize_job.assert_called_once_with({}, {})
+ mock_print_job_logs.assert_called_once_with(
+ job_name=job_name,
+ run_id=job_run_id,
+ job_failed=False,
+ next_token=None,
+ )
+ assert glue.job_name == job_name
+
+ @mock.patch.object(GlueJobHook, 'print_job_logs')
@mock.patch.object(GlueJobHook, 'job_completion')
@mock.patch.object(GlueJobHook, 'initialize_job')
@mock.patch.object(GlueJobHook, "get_conn")
@mock.patch.object(S3Hook, "load_file")
def test_execute_without_waiting_for_completion(
- self, mock_load_file, mock_get_conn, mock_initialize_job, mock_job_completion
+ self, mock_load_file, mock_get_conn, mock_initialize_job, mock_job_completion, mock_print_job_logs
):
glue = GlueJobOperator(
task_id='test_glue_operator',
@@ -79,8 +122,11 @@ class TestGlueJobOperator(unittest.TestCase):
wait_for_completion=False,
)
mock_initialize_job.return_value = {'JobRunState': 'RUNNING', 'JobRunId': '11111'}
+
job_run_id = glue.execute({})
+
mock_initialize_job.assert_called_once_with({}, {})
mock_job_completion.assert_not_called()
+ mock_print_job_logs.assert_not_called()
assert glue.job_name == 'my_test_job'
assert job_run_id == '11111'
diff --git a/tests/providers/amazon/aws/sensors/test_glue.py b/tests/providers/amazon/aws/sensors/test_glue.py
index 672c612353..8fb96ac946 100644
--- a/tests/providers/amazon/aws/sensors/test_glue.py
+++ b/tests/providers/amazon/aws/sensors/test_glue.py
@@ -17,19 +17,24 @@
import unittest
from unittest import mock
+from unittest.mock import ANY
-from airflow import configuration
+import pytest
+
+from airflow import AirflowException
+from airflow.configuration import conf
from airflow.providers.amazon.aws.hooks.glue import GlueJobHook
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
class TestGlueJobSensor(unittest.TestCase):
def setUp(self):
- configuration.load_test_config()
+ conf.load_test_config()
+ @mock.patch.object(GlueJobHook, 'print_job_logs')
@mock.patch.object(GlueJobHook, 'get_conn')
@mock.patch.object(GlueJobHook, 'get_job_state')
- def test_poke(self, mock_get_job_state, mock_conn):
+ def test_poke(self, mock_get_job_state, mock_conn, mock_print_job_logs):
mock_conn.return_value.get_job_run()
mock_get_job_state.return_value = 'SUCCEEDED'
op = GlueJobSensor(
@@ -38,13 +43,40 @@ class TestGlueJobSensor(unittest.TestCase):
run_id='5152fgsfsjhsh61661',
poke_interval=1,
timeout=5,
- aws_conn_id='aws_default',
)
+
+ assert op.poke({})
+ mock_print_job_logs.assert_not_called()
+
+ @mock.patch.object(GlueJobHook, 'print_job_logs')
+ @mock.patch.object(GlueJobHook, 'get_conn')
+ @mock.patch.object(GlueJobHook, 'get_job_state')
+ def test_poke_with_verbose_logging(self, mock_get_job_state, mock_conn, mock_print_job_logs):
+ mock_conn.return_value.get_job_run()
+ mock_get_job_state.return_value = 'SUCCEEDED'
+ job_name = 'job_name'
+ job_run_id = 'job_run_id'
+ op = GlueJobSensor(
+ task_id='test_glue_job_sensor',
+ job_name=job_name,
+ run_id=job_run_id,
+ poke_interval=1,
+ timeout=5,
+ verbose=True,
+ )
+
assert op.poke({})
+ mock_print_job_logs.assert_called_once_with(
+ job_name=job_name,
+ run_id=job_run_id,
+ job_failed=False,
+ next_token=ANY,
+ )
+ @mock.patch.object(GlueJobHook, 'print_job_logs')
@mock.patch.object(GlueJobHook, 'get_conn')
@mock.patch.object(GlueJobHook, 'get_job_state')
- def test_poke_false(self, mock_get_job_state, mock_conn):
+ def test_poke_false(self, mock_get_job_state, mock_conn, mock_print_job_logs):
mock_conn.return_value.get_job_run()
mock_get_job_state.return_value = 'RUNNING'
op = GlueJobSensor(
@@ -53,9 +85,62 @@ class TestGlueJobSensor(unittest.TestCase):
run_id='5152fgsfsjhsh61661',
poke_interval=1,
timeout=5,
- aws_conn_id='aws_default',
)
+
assert not op.poke({})
+ mock_print_job_logs.assert_not_called()
+
+ @mock.patch.object(GlueJobHook, 'print_job_logs')
+ @mock.patch.object(GlueJobHook, 'get_conn')
+ @mock.patch.object(GlueJobHook, 'get_job_state')
+ def test_poke_false_with_verbose_logging(self, mock_get_job_state, mock_conn, mock_print_job_logs):
+ mock_conn.return_value.get_job_run()
+ mock_get_job_state.return_value = 'RUNNING'
+ job_name = 'job_name'
+ job_run_id = 'job_run_id'
+ op = GlueJobSensor(
+ task_id='test_glue_job_sensor',
+ job_name=job_name,
+ run_id=job_run_id,
+ poke_interval=1,
+ timeout=5,
+ verbose=True,
+ )
+
+ assert not op.poke({})
+ mock_print_job_logs.assert_called_once_with(
+ job_name=job_name,
+ run_id=job_run_id,
+ job_failed=False,
+ next_token=ANY,
+ )
+
+ @mock.patch.object(GlueJobHook, 'print_job_logs')
+ @mock.patch.object(GlueJobHook, 'get_conn')
+ @mock.patch.object(GlueJobHook, 'get_job_state')
+ def test_poke_failed_job_with_verbose_logging(self, mock_get_job_state, mock_conn, mock_print_job_logs):
+ mock_conn.return_value.get_job_run()
+ mock_get_job_state.return_value = 'FAILED'
+ job_name = 'job_name'
+ job_run_id = 'job_run_id'
+ op = GlueJobSensor(
+ task_id='test_glue_job_sensor',
+ job_name=job_name,
+ run_id=job_run_id,
+ poke_interval=1,
+ timeout=5,
+ verbose=True,
+ )
+
+ with pytest.raises(AirflowException):
+ assert not op.poke({})
+ mock_print_job_logs.assert_called_once_with(
+ job_name=job_name,
+ run_id=job_run_id,
+ log_group_suffix='error',
+ filter_pattern='?ERROR ?Exception',
+ next_token=ANY,
+ )
if __name__ == '__main__':