You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/11/03 07:02:39 UTC

[airflow] branch main updated: Fix S3ToRedshiftOperator (#19358)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6148ddd  Fix S3ToRedshiftOperator (#19358)
6148ddd is described below

commit 6148ddd365939bb5129b342900a576bd855e9fc4
Author: Mario Taddeucci <ma...@gmx.com>
AuthorDate: Wed Nov 3 04:01:50 2021 -0300

    Fix S3ToRedshiftOperator (#19358)
---
 .../amazon/aws/transfers/s3_to_redshift.py         | 25 ++++++++++------------
 .../amazon/aws/transfers/test_s3_to_redshift.py    |  6 +++---
 2 files changed, 14 insertions(+), 17 deletions(-)

diff --git a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
index c0a1a7c..5f5eb38 100644
--- a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
+++ b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
@@ -149,12 +149,7 @@ class S3ToRedshiftOperator(BaseOperator):
         copy_statement = self._build_copy_query(copy_destination, credentials_block, copy_options)
 
         if self.method == 'REPLACE':
-            sql = f"""
-            BEGIN;
-            DELETE FROM {destination};
-            {copy_statement}
-            COMMIT
-            """
+            sql = ["BEGIN;", f"DELETE FROM {destination};", copy_statement, "COMMIT"]
         elif self.method == 'UPSERT':
             keys = self.upsert_keys or redshift_hook.get_table_primary_key(self.table, self.schema)
             if not keys:
@@ -162,14 +157,16 @@ class S3ToRedshiftOperator(BaseOperator):
                     f"No primary key on {self.schema}.{self.table}. Please provide keys on 'upsert_keys'"
                 )
             where_statement = ' AND '.join([f'{self.table}.{k} = {copy_destination}.{k}' for k in keys])
-            sql = f"""
-            CREATE TABLE {copy_destination} (LIKE {destination});
-            {copy_statement}
-            BEGIN;
-            DELETE FROM {destination} USING {copy_destination} WHERE {where_statement};
-            INSERT INTO {destination} SELECT * FROM {copy_destination};
-            COMMIT
-            """
+
+            sql = [
+                f"CREATE TABLE {copy_destination} (LIKE {destination});",
+                copy_statement,
+                "BEGIN;",
+                f"DELETE FROM {destination} USING {copy_destination} WHERE {where_statement};",
+                f"INSERT INTO {destination} SELECT * FROM {copy_destination};",
+                "COMMIT",
+            ]
+
         else:
             sql = copy_statement
 
diff --git a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
index ff03165..007ed87 100644
--- a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
+++ b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
@@ -170,7 +170,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
                     {copy_statement}
                     COMMIT
                     """
-        assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], transaction)
+        assert_equal_ignore_multiple_spaces(self, "\n".join(mock_run.call_args[0][0]), transaction)
 
         assert mock_run.call_count == 1
 
@@ -222,7 +222,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
                     {copy_statement}
                     COMMIT
                     """
-        assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], transaction)
+        assert_equal_ignore_multiple_spaces(self, "\n".join(mock_run.call_args[0][0]), transaction)
 
         assert mock_run.call_count == 1
 
@@ -277,7 +277,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
                     INSERT INTO {schema}.{table} SELECT * FROM #{table};
                     COMMIT
                     """
-        assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], transaction)
+        assert_equal_ignore_multiple_spaces(self, "\n".join(mock_run.call_args[0][0]), transaction)
 
         assert mock_run.call_count == 1