You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/12/06 21:23:30 UTC
[airflow] branch main updated: Fix Type Error while using DynamoDBToS3Operator (#28158)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0d90c62bac Fix Type Error while using DynamoDBToS3Operator (#28158)
0d90c62bac is described below
commit 0d90c62bac49de9aef6a31ee3e62d02e458b0d33
Author: sanjayp <sa...@gmail.com>
AuthorDate: Tue Dec 6 15:23:22 2022 -0600
Fix Type Error while using DynamoDBToS3Operator (#28158)
---
.../amazon/aws/transfers/dynamodb_to_s3.py | 13 ++++++-
.../amazon/aws/transfers/test_dynamodb_to_s3.py | 43 +++++++++++++++++++++-
2 files changed, 54 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
index 155f5439a6..716e9c2721 100644
--- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
@@ -23,6 +23,7 @@ from __future__ import annotations
import json
from copy import copy
+from decimal import Decimal
from os.path import getsize
from tempfile import NamedTemporaryFile
from typing import IO, TYPE_CHECKING, Any, Callable, Sequence
@@ -36,8 +37,18 @@ if TYPE_CHECKING:
from airflow.utils.context import Context
+class JSONEncoder(json.JSONEncoder):
+ """Custom json encoder implementation"""
+
+ def default(self, obj):
+ """Convert decimal objects in a json serializable format."""
+ if isinstance(obj, Decimal):
+ return float(obj)
+ return super().default(obj)
+
+
def _convert_item_to_json_bytes(item: dict[str, Any]) -> bytes:
- return (json.dumps(item) + "\n").encode("utf-8")
+ return (json.dumps(item, cls=JSONEncoder) + "\n").encode("utf-8")
def _upload_file_to_s3(
diff --git a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
index 2aea42754b..2de653fb4a 100644
--- a/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
@@ -19,9 +19,21 @@ from __future__ import annotations
import json
import unittest
+from decimal import Decimal
from unittest.mock import MagicMock, patch
-from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator
+from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator, JSONEncoder
+
+
+class JSONEncoderTest(unittest.TestCase):
+ def test_jsonencoder_with_decimal(self):
+ """Test JSONEncoder correctly encodes and decodes decimal values."""
+
+ for i in ["102938.3043847474", 1.010001, 10, "100", "1E-128", 1e-128]:
+ org = Decimal(i)
+ encoded = json.dumps(org, cls=JSONEncoder)
+ decoded = json.loads(encoded, parse_float=Decimal)
+ self.assertAlmostEqual(decoded, org)
class DynamodbToS3Test(unittest.TestCase):
@@ -65,6 +77,35 @@ class DynamodbToS3Test(unittest.TestCase):
assert [{"a": 1}, {"b": 2}, {"c": 3}] == self.output_queue
+ @patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.S3Hook")
+ @patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBHook")
+ def test_dynamodb_to_s3_success_with_decimal(self, mock_aws_dynamodb_hook, mock_s3_hook):
+ a = Decimal(10.028)
+ b = Decimal("10.048")
+ responses = [
+ {
+ "Items": [{"a": a}, {"b": b}],
+ }
+ ]
+ table = MagicMock()
+ table.return_value.scan.side_effect = responses
+ mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table
+
+ s3_client = MagicMock()
+ s3_client.return_value.upload_file = self.mock_upload_file
+ mock_s3_hook.return_value.get_conn = s3_client
+
+ dynamodb_to_s3_operator = DynamoDBToS3Operator(
+ task_id="dynamodb_to_s3",
+ dynamodb_table_name="airflow_rocks",
+ s3_bucket_name="airflow-bucket",
+ file_size=4000,
+ )
+
+ dynamodb_to_s3_operator.execute(context={})
+
+ assert [{"a": float(a)}, {"b": float(b)}] == self.output_queue
+
@patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.S3Hook")
@patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBHook")
def test_dynamodb_to_s3_with_different_aws_conn_id(self, mock_aws_dynamodb_hook, mock_s3_hook):