You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/12/06 21:23:30 UTC

[airflow] branch main updated: Fix Type Error while using DynamoDBToS3Operator (#28158)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d90c62bac Fix Type Error while using DynamoDBToS3Operator (#28158)
0d90c62bac is described below

commit 0d90c62bac49de9aef6a31ee3e62d02e458b0d33
Author: sanjayp <sa...@gmail.com>
AuthorDate: Tue Dec 6 15:23:22 2022 -0600

    Fix Type Error while using DynamoDBToS3Operator (#28158)
---
 .../amazon/aws/transfers/dynamodb_to_s3.py         | 13 ++++++-
 .../amazon/aws/transfers/test_dynamodb_to_s3.py    | 43 +++++++++++++++++++++-
 2 files changed, 54 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
index 155f5439a6..716e9c2721 100644
--- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
@@ -23,6 +23,7 @@ from __future__ import annotations
 
 import json
 from copy import copy
+from decimal import Decimal
 from os.path import getsize
 from tempfile import NamedTemporaryFile
 from typing import IO, TYPE_CHECKING, Any, Callable, Sequence
@@ -36,8 +37,18 @@ if TYPE_CHECKING:
     from airflow.utils.context import Context
 
 
+class JSONEncoder(json.JSONEncoder):
+    """Custom json encoder implementation"""
+
+    def default(self, obj):
+        """Convert decimal objects in a json serializable format."""
+        if isinstance(obj, Decimal):
+            return float(obj)
+        return super().default(obj)
+
+
 def _convert_item_to_json_bytes(item: dict[str, Any]) -> bytes:
-    return (json.dumps(item) + "\n").encode("utf-8")
+    return (json.dumps(item, cls=JSONEncoder) + "\n").encode("utf-8")
 
 
 def _upload_file_to_s3(
diff --git a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
index 2aea42754b..2de653fb4a 100644
--- a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
@@ -19,9 +19,21 @@ from __future__ import annotations
 
 import json
 import unittest
+from decimal import Decimal
 from unittest.mock import MagicMock, patch
 
-from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator
+from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator, JSONEncoder
+
+
+class JSONEncoderTest(unittest.TestCase):
+    def test_jsonencoder_with_decimal(self):
+        """Test JSONEncoder correctly encodes and decodes decimal values."""
+
+        for i in ["102938.3043847474", 1.010001, 10, "100", "1E-128", 1e-128]:
+            org = Decimal(i)
+            encoded = json.dumps(org, cls=JSONEncoder)
+            decoded = json.loads(encoded, parse_float=Decimal)
+            self.assertAlmostEqual(decoded, org)
 
 
 class DynamodbToS3Test(unittest.TestCase):
@@ -65,6 +77,35 @@ class DynamodbToS3Test(unittest.TestCase):
 
         assert [{"a": 1}, {"b": 2}, {"c": 3}] == self.output_queue
 
+    @patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.S3Hook")
+    @patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBHook")
+    def test_dynamodb_to_s3_success_with_decimal(self, mock_aws_dynamodb_hook, mock_s3_hook):
+        a = Decimal(10.028)
+        b = Decimal("10.048")
+        responses = [
+            {
+                "Items": [{"a": a}, {"b": b}],
+            }
+        ]
+        table = MagicMock()
+        table.return_value.scan.side_effect = responses
+        mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
+
+        s3_client = MagicMock()
+        s3_client.return_value.upload_file = self.mock_upload_file
+        mock_s3_hook.return_value.get_conn = s3_client
+
+        dynamodb_to_s3_operator = DynamoDBToS3Operator(
+            task_id="dynamodb_to_s3",
+            dynamodb_table_name="airflow_rocks",
+            s3_bucket_name="airflow-bucket",
+            file_size=4000,
+        )
+
+        dynamodb_to_s3_operator.execute(context={})
+
+        assert [{"a": float(a)}, {"b": float(b)}] == self.output_queue
+
     @patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.S3Hook")
     @patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBHook")
     def test_dynamodb_to_s3_with_different_aws_conn_id(self, mock_aws_dynamodb_hook, mock_s3_hook):