You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/11/03 07:02:39 UTC
[airflow] branch main updated: Fix S3ToRedshiftOperator (#19358)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6148ddd Fix S3ToRedshiftOperator (#19358)
6148ddd is described below
commit 6148ddd365939bb5129b342900a576bd855e9fc4
Author: Mario Taddeucci <ma...@gmx.com>
AuthorDate: Wed Nov 3 04:01:50 2021 -0300
Fix S3ToRedshiftOperator (#19358)
---
.../amazon/aws/transfers/s3_to_redshift.py | 25 ++++++++++------------
.../amazon/aws/transfers/test_s3_to_redshift.py | 6 +++---
2 files changed, 14 insertions(+), 17 deletions(-)
diff --git a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
index c0a1a7c..5f5eb38 100644
--- a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
+++ b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
@@ -149,12 +149,7 @@ class S3ToRedshiftOperator(BaseOperator):
copy_statement = self._build_copy_query(copy_destination, credentials_block, copy_options)
if self.method == 'REPLACE':
- sql = f"""
- BEGIN;
- DELETE FROM {destination};
- {copy_statement}
- COMMIT
- """
+ sql = ["BEGIN;", f"DELETE FROM {destination};", copy_statement, "COMMIT"]
elif self.method == 'UPSERT':
keys = self.upsert_keys or redshift_hook.get_table_primary_key(self.table, self.schema)
if not keys:
@@ -162,14 +157,16 @@ class S3ToRedshiftOperator(BaseOperator):
f"No primary key on {self.schema}.{self.table}. Please provide keys on 'upsert_keys'"
)
where_statement = ' AND '.join([f'{self.table}.{k} = {copy_destination}.{k}' for k in keys])
- sql = f"""
- CREATE TABLE {copy_destination} (LIKE {destination});
- {copy_statement}
- BEGIN;
- DELETE FROM {destination} USING {copy_destination} WHERE {where_statement};
- INSERT INTO {destination} SELECT * FROM {copy_destination};
- COMMIT
- """
+
+ sql = [
+ f"CREATE TABLE {copy_destination} (LIKE {destination});",
+ copy_statement,
+ "BEGIN;",
+ f"DELETE FROM {destination} USING {copy_destination} WHERE {where_statement};",
+ f"INSERT INTO {destination} SELECT * FROM {copy_destination};",
+ "COMMIT",
+ ]
+
else:
sql = copy_statement
diff --git a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
index ff03165..007ed87 100644
--- a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
+++ b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
@@ -170,7 +170,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
{copy_statement}
COMMIT
"""
- assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], transaction)
+ assert_equal_ignore_multiple_spaces(self, "\n".join(mock_run.call_args[0][0]), transaction)
assert mock_run.call_count == 1
@@ -222,7 +222,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
{copy_statement}
COMMIT
"""
- assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], transaction)
+ assert_equal_ignore_multiple_spaces(self, "\n".join(mock_run.call_args[0][0]), transaction)
assert mock_run.call_count == 1
@@ -277,7 +277,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
INSERT INTO {schema}.{table} SELECT * FROM #{table};
COMMIT
"""
- assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], transaction)
+ assert_equal_ignore_multiple_spaces(self, "\n".join(mock_run.call_args[0][0]), transaction)
assert mock_run.call_count == 1