You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by vi...@apache.org on 2023/10/03 13:29:45 UTC

[airflow] branch main updated: Include AWS Lambda execution logs to task logs (#34692)

This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3064812280 Include AWS Lambda execution logs to task logs (#34692)
3064812280 is described below

commit 306481228071b708d6ad4b9a97486ab2d9295a5d
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Tue Oct 3 17:29:34 2023 +0400

    Include AWS Lambda execution logs to task logs (#34692)
---
 .../providers/amazon/aws/hooks/lambda_function.py  | 19 ++++++-
 .../amazon/aws/operators/lambda_function.py        | 21 +++++++-
 .../amazon/aws/hooks/test_lambda_function.py       | 18 +++++++
 .../amazon/aws/operators/test_lambda_function.py   | 63 +++++++++++++++++-----
 4 files changed, 107 insertions(+), 14 deletions(-)

diff --git a/airflow/providers/amazon/aws/hooks/lambda_function.py b/airflow/providers/amazon/aws/hooks/lambda_function.py
index fcb24f7763..d2fe27c23c 100644
--- a/airflow/providers/amazon/aws/hooks/lambda_function.py
+++ b/airflow/providers/amazon/aws/hooks/lambda_function.py
@@ -18,10 +18,12 @@
 """This module contains AWS Lambda hook."""
 from __future__ import annotations
 
+import base64
 from typing import Any
 
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
 from airflow.providers.amazon.aws.utils import trim_none_values
+from airflow.providers.amazon.aws.utils.suppress import return_on_error
 
 
 class LambdaHook(AwsBaseHook):
@@ -59,7 +61,8 @@ class LambdaHook(AwsBaseHook):
 
         :param function_name: AWS Lambda Function Name
         :param invocation_type: AWS Lambda Invocation Type (RequestResponse, Event etc)
-        :param log_type: Tail Invocation Request
+        :param log_type: Set to Tail to include the execution log in the response.
+            Applies to synchronously invoked functions only.
         :param client_context: Up to 3,583 bytes of base64-encoded data about the invoking client
             to pass to the function in the context object.
         :param payload: The JSON that you want to provide to your Lambda function as input.
@@ -179,3 +182,17 @@ class LambdaHook(AwsBaseHook):
             "Architectures": architectures,
         }
         return self.conn.create_function(**trim_none_values(create_function_args))
+
+    @staticmethod
+    @return_on_error(None)
+    def encode_log_result(log_result: str, *, keep_empty_lines: bool = True) -> list[str] | None:
+        """
+        Encode execution log from the response and return list of log records.
+
+        Returns ``None`` on error, e.g. invalid base64-encoded string
+
+        :param log_result: base64-encoded string which contain Lambda execution Log.
+        :param keep_empty_lines: Whether or not keep empty lines.
+        """
+        encoded_log_result = base64.b64decode(log_result.encode("ascii")).decode()
+        return [log_row for log_row in encoded_log_result.splitlines() if keep_empty_lines or log_row]
diff --git a/airflow/providers/amazon/aws/operators/lambda_function.py b/airflow/providers/amazon/aws/operators/lambda_function.py
index 26f9f38cbb..61d97ebed3 100644
--- a/airflow/providers/amazon/aws/operators/lambda_function.py
+++ b/airflow/providers/amazon/aws/operators/lambda_function.py
@@ -165,7 +165,10 @@ class LambdaInvokeFunctionOperator(BaseOperator):
         :ref:`howto/operator:LambdaInvokeFunctionOperator`
 
     :param function_name: The name of the AWS Lambda function, version, or alias.
-    :param log_type: Set to Tail to include the execution log in the response. Otherwise, set to "None".
+    :param log_type: Set to Tail to include the execution log in the response and task logs.
+        Otherwise, set to "None". Applies to synchronously invoked functions only,
+        and returns the last 4 KB of the execution log.
+    :param keep_empty_log_lines: Whether or not keep empty lines in the execution log.
     :param qualifier: Specify a version or alias to invoke a published version of the function.
     :param invocation_type: AWS Lambda invocation type (RequestResponse, Event, DryRun)
     :param client_context: Data about the invoking client to pass to the function in the context object
@@ -181,6 +184,7 @@ class LambdaInvokeFunctionOperator(BaseOperator):
         *,
         function_name: str,
         log_type: str | None = None,
+        keep_empty_log_lines: bool = True,
         qualifier: str | None = None,
         invocation_type: str | None = None,
         client_context: str | None = None,
@@ -192,6 +196,7 @@ class LambdaInvokeFunctionOperator(BaseOperator):
         self.function_name = function_name
         self.payload = payload
         self.log_type = log_type
+        self.keep_empty_log_lines = keep_empty_log_lines
         self.qualifier = qualifier
         self.invocation_type = invocation_type
         self.client_context = client_context
@@ -218,6 +223,20 @@ class LambdaInvokeFunctionOperator(BaseOperator):
             qualifier=self.qualifier,
         )
         self.log.info("Lambda response metadata: %r", response.get("ResponseMetadata"))
+
+        if log_result := response.get("LogResult"):
+            log_records = self.hook.encode_log_result(
+                log_result,
+                keep_empty_lines=self.keep_empty_log_lines,
+            )
+            if log_records:
+                self.log.info(
+                    "The last 4 KB of the Lambda execution log (keep_empty_log_lines=%s).",
+                    self.keep_empty_log_lines,
+                )
+                for log_record in log_records:
+                    self.log.info(log_record)
+
         if response.get("StatusCode") not in success_status_codes:
             raise ValueError("Lambda function did not execute", json.dumps(response.get("ResponseMetadata")))
         payload_stream = response.get("Payload")
diff --git a/tests/providers/amazon/aws/hooks/test_lambda_function.py b/tests/providers/amazon/aws/hooks/test_lambda_function.py
index caaf164be4..006d0977a2 100644
--- a/tests/providers/amazon/aws/hooks/test_lambda_function.py
+++ b/tests/providers/amazon/aws/hooks/test_lambda_function.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import base64
 from unittest import mock
 from unittest.mock import MagicMock
 
@@ -31,6 +32,8 @@ RUNTIME = "python3.9"
 ROLE = "role"
 HANDLER = "handler"
 CODE = {}
+LOG_RESPONSE = base64.b64encode(b"FOO\n\nBAR\n\n").decode()
+BAD_LOG_RESPONSE = LOG_RESPONSE[:-3]
 
 
 class LambdaHookForTests(LambdaHook):
@@ -136,3 +139,18 @@ class TestLambdaHook:
                 package_type="Zip",
                 **params,
             )
+
+    def test_encode_log_result(self):
+        assert LambdaHook.encode_log_result(LOG_RESPONSE) == ["FOO", "", "BAR", ""]
+        assert LambdaHook.encode_log_result(LOG_RESPONSE, keep_empty_lines=False) == ["FOO", "BAR"]
+        assert LambdaHook.encode_log_result("") == []
+
+    @pytest.mark.parametrize(
+        "log_result",
+        [
+            pytest.param(BAD_LOG_RESPONSE, id="corrupted"),
+            pytest.param(None, id="none"),
+        ],
+    )
+    def test_encode_corrupted_log_result(self, log_result):
+        assert LambdaHook.encode_log_result(log_result) is None
diff --git a/tests/providers/amazon/aws/operators/test_lambda_function.py b/tests/providers/amazon/aws/operators/test_lambda_function.py
index 6fc3a3b64f..583dc1fc54 100644
--- a/tests/providers/amazon/aws/operators/test_lambda_function.py
+++ b/tests/providers/amazon/aws/operators/test_lambda_function.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import base64
 from unittest import mock
 from unittest.mock import Mock, patch
 
@@ -30,10 +31,15 @@ from airflow.providers.amazon.aws.operators.lambda_function import (
 )
 
 FUNCTION_NAME = "function_name"
-PAYLOAD = '{"hello": "airflow"}'
-BYTES_PAYLOAD = b'{"hello": "airflow"}'
+PAYLOADS = [
+    pytest.param('{"hello": "airflow"}', id="string-payload"),
+    pytest.param(b'{"hello": "airflow"}', id="bytes-payload"),
+]
 ROLE_ARN = "role_arn"
 IMAGE_URI = "image_uri"
+LOG_RESPONSE = base64.b64encode(b"FOO\n\nBAR\n\n").decode()
+BAD_LOG_RESPONSE = LOG_RESPONSE[:-3]
+NO_LOG_RESPONSE_SENTINEL = type("NoLogResponseSentinel", (), {})()
 
 
 class TestLambdaCreateFunctionOperator:
@@ -86,10 +92,7 @@ class TestLambdaCreateFunctionOperator:
 
 
 class TestLambdaInvokeFunctionOperator:
-    @pytest.mark.parametrize(
-        "payload",
-        [PAYLOAD, BYTES_PAYLOAD],
-    )
+    @pytest.mark.parametrize("payload", PAYLOADS)
     def test_init(self, payload):
         lambda_operator = LambdaInvokeFunctionOperator(
             task_id="test",
@@ -104,33 +107,57 @@ class TestLambdaInvokeFunctionOperator:
         assert lambda_operator.log_type == "None"
         assert lambda_operator.aws_conn_id == "aws_conn_test"
 
-    @patch.object(LambdaInvokeFunctionOperator, "hook", new_callable=mock.PropertyMock)
+    @mock.patch.object(LambdaHook, "invoke_lambda")
+    @mock.patch.object(LambdaHook, "conn")
     @pytest.mark.parametrize(
-        "payload",
-        [PAYLOAD, BYTES_PAYLOAD],
+        "keep_empty_log_lines", [pytest.param(True, id="keep"), pytest.param(False, id="truncate")]
     )
-    def test_invoke_lambda(self, hook_mock, payload):
+    @pytest.mark.parametrize(
+        "log_result, expected_execution_logs",
+        [
+            pytest.param(LOG_RESPONSE, True, id="log-result"),
+            pytest.param(BAD_LOG_RESPONSE, False, id="corrupted-log-result"),
+            pytest.param(None, False, id="none-log-result"),
+            pytest.param(NO_LOG_RESPONSE_SENTINEL, False, id="no-response"),
+        ],
+    )
+    @pytest.mark.parametrize("payload", PAYLOADS)
+    def test_invoke_lambda(
+        self,
+        mock_conn,
+        mock_invoke,
+        payload,
+        keep_empty_log_lines,
+        log_result,
+        expected_execution_logs,
+        caplog,
+    ):
         operator = LambdaInvokeFunctionOperator(
             task_id="task_test",
             function_name="a",
             invocation_type="b",
             log_type="c",
+            keep_empty_log_lines=keep_empty_log_lines,
             client_context="d",
             payload=payload,
             qualifier="f",
         )
         returned_payload = Mock()
         returned_payload.read().decode.return_value = "data was read"
-        hook_mock().invoke_lambda.return_value = {
+        fake_response = {
             "ResponseMetadata": "",
             "StatusCode": 200,
             "Payload": returned_payload,
         }
+        if log_result is not NO_LOG_RESPONSE_SENTINEL:
+            fake_response["LogResult"] = log_result
+        mock_invoke.return_value = fake_response
 
+        caplog.set_level("INFO", "airflow.task.operators")
         value = operator.execute(None)
 
         assert value == "data was read"
-        hook_mock().invoke_lambda.assert_called_once_with(
+        mock_invoke.assert_called_once_with(
             function_name="a",
             invocation_type="b",
             log_type="c",
@@ -139,6 +166,18 @@ class TestLambdaInvokeFunctionOperator:
             qualifier="f",
         )
 
+        # Validate log messages in task logs
+        if expected_execution_logs:
+            assert "The last 4 KB of the Lambda execution log" in caplog.text
+            assert "FOO" in caplog.messages
+            assert "BAR" in caplog.messages
+            if keep_empty_log_lines:
+                assert "" in caplog.messages
+            else:
+                assert "" not in caplog.messages
+        else:
+            assert "The last 4 KB of the Lambda execution log" not in caplog.text
+
     @patch.object(LambdaInvokeFunctionOperator, "hook", new_callable=mock.PropertyMock)
     def test_invoke_lambda_bad_http_code(self, hook_mock):
         operator = LambdaInvokeFunctionOperator(