You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/03 14:12:46 UTC

[GitHub] [airflow] sunki-hong opened a new pull request #16241: Update copy command for s3 to redshift

sunki-hong opened a new pull request #16241:
URL: https://github.com/apache/airflow/pull/16241


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   Update copy command for s3_to_redshift to be able to select column names while copying files from s3 to redshift.
   
   ```sql
   COPY tablename (column1 [,column2, ...]) 
   ```
   
   Related aws documentation ([Link](https://docs.aws.amazon.com/ko_kr/redshift/latest/dg/r_COPY.html)),
   
   
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646098061



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
         assert secret_key in copy_query
         assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
 
+    @mock.patch("boto3.session.Session")
+    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    def test_execute_with_column_list(self, mock_run, mock_session):
+        access_key = "aws_access_key_id"
+        secret_key = "aws_secret_access_key"
+        mock_session.return_value = Session(access_key, secret_key)
+        mock_session.return_value.access_key = access_key
+        mock_session.return_value.secret_key = secret_key
+        mock_session.return_value.token = None
+
+        schema = "schema"
+        table = "table"
+        s3_bucket = "bucket"
+        s3_key = "key"
+        column_list = ["column_1", "column_2"]
+        copy_options = ""
+
+        op = S3ToRedshiftOperator(
+            schema=schema,
+            table=table,
+            s3_bucket=s3_bucket,
+            s3_key=s3_key,
+            column_list=column_list,
+            copy_options=copy_options,
+            redshift_conn_id="redshift_conn_id",
+            aws_conn_id="aws_conn_id",
+            task_id="task_id",
+            dag=None,
+        )
+        op.execute(None)
+
+        credentials_block = build_credentials_block(mock_session.return_value)
+        copy_query = op._build_copy_query(credentials_block, copy_options)

Review comment:
       Can you add tests for SQL without columns?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646093864



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
         assert secret_key in copy_query
         assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
 
+    @mock.patch("boto3.session.Session")
+    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    def test_execute_with_column_list(self, mock_run, mock_session):
+        access_key = "aws_access_key_id"
+        secret_key = "aws_secret_access_key"
+        mock_session.return_value = Session(access_key, secret_key)
+        mock_session.return_value.access_key = access_key
+        mock_session.return_value.secret_key = secret_key
+        mock_session.return_value.token = None
+
+        schema = "schema"
+        table = "table"
+        s3_bucket = "bucket"
+        s3_key = "key"
+        column_list = ["column_1", "column_2"]
+        copy_options = ""
+
+        op = S3ToRedshiftOperator(
+            schema=schema,
+            table=table,
+            s3_bucket=s3_bucket,
+            s3_key=s3_key,
+            column_list=column_list,
+            copy_options=copy_options,
+            redshift_conn_id="redshift_conn_id",
+            aws_conn_id="aws_conn_id",
+            task_id="task_id",
+            dag=None,
+        )
+        op.execute(None)
+
+        credentials_block = build_credentials_block(mock_session.return_value)
+        copy_query = op._build_copy_query(credentials_block, copy_options)

Review comment:
       Oh then like this?
   
   ```suggestion
           copy_query = op._build_copy_query(credentials_block, copy_options)
           expected_copy_query = '''
                                 COPY schema.table (column_1, column_2)
                                 FROM 's3://bucket/key'
                                 with credentials
                                 'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
                                 ;
                                 '''
           assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646124491



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
 
         credentials_block = build_credentials_block(mock_session.return_value)
         copy_query = op._build_copy_query(credentials_block, copy_options)
+        expected_copy_query = '''
+                                COPY schema.table
+                                FROM 's3://bucket/key'
+                                with credentials
+                                'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+                                ;
+                             '''
+        assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+        assert mock_run.call_count == 1
+        assert access_key in copy_query
+        assert secret_key in copy_query
+        assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)

Review comment:
       here we have access to the query, so we can drop of invoking private methods and compare this value. Thus, this test will still check implementations, but will depend less on the internal details. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646097855



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
         assert secret_key in copy_query
         assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
 
+    @mock.patch("boto3.session.Session")
+    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    def test_execute_with_column_list(self, mock_run, mock_session):
+        access_key = "aws_access_key_id"
+        secret_key = "aws_secret_access_key"
+        mock_session.return_value = Session(access_key, secret_key)
+        mock_session.return_value.access_key = access_key
+        mock_session.return_value.secret_key = secret_key
+        mock_session.return_value.token = None
+
+        schema = "schema"
+        table = "table"
+        s3_bucket = "bucket"
+        s3_key = "key"
+        column_list = ["column_1", "column_2"]
+        copy_options = ""
+
+        op = S3ToRedshiftOperator(
+            schema=schema,
+            table=table,
+            s3_bucket=s3_bucket,
+            s3_key=s3_key,
+            column_list=column_list,
+            copy_options=copy_options,
+            redshift_conn_id="redshift_conn_id",
+            aws_conn_id="aws_conn_id",
+            task_id="task_id",
+            dag=None,
+        )
+        op.execute(None)
+
+        credentials_block = build_credentials_block(mock_session.return_value)
+        copy_query = op._build_copy_query(credentials_block, copy_options)

Review comment:
       Exactly. Building this SQL is crucial in integrating with a thirty-patty service, so it should show up in tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646774062



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
 
         credentials_block = build_credentials_block(mock_session.return_value)
         copy_query = op._build_copy_query(credentials_block, copy_options)
+        expected_copy_query = '''
+                                COPY schema.table
+                                FROM 's3://bucket/key'
+                                with credentials
+                                'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+                                ;
+                             '''
+        assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+        assert mock_run.call_count == 1
+        assert access_key in copy_query
+        assert secret_key in copy_query
+        assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)

Review comment:
       Done :) 
   Changed all copy_query to plain text
   Thanks for all the comment @mik-laj 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646005770



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
         assert secret_key in copy_query
         assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
 
+    @mock.patch("boto3.session.Session")
+    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    def test_execute_with_column_list(self, mock_run, mock_session):
+        access_key = "aws_access_key_id"
+        secret_key = "aws_secret_access_key"
+        mock_session.return_value = Session(access_key, secret_key)
+        mock_session.return_value.access_key = access_key
+        mock_session.return_value.secret_key = secret_key
+        mock_session.return_value.token = None
+
+        schema = "schema"
+        table = "table"
+        s3_bucket = "bucket"
+        s3_key = "key"
+        column_list = ["column_1", "column_2"]
+        copy_options = ""
+
+        op = S3ToRedshiftOperator(
+            schema=schema,
+            table=table,
+            s3_bucket=s3_bucket,
+            s3_key=s3_key,
+            column_list=column_list,
+            copy_options=copy_options,
+            redshift_conn_id="redshift_conn_id",
+            aws_conn_id="aws_conn_id",
+            task_id="task_id",
+            dag=None,
+        )
+        op.execute(None)
+
+        credentials_block = build_credentials_block(mock_session.return_value)
+        copy_query = op._build_copy_query(credentials_block, copy_options)

Review comment:
       Hi @mik-laj :) Thanks for the review.
   
   Could you explain it in more detail? 
   
   Do you want the result of `op._build_copy_query` to be asserted with expected plain copy text?
   I thought that line 105 ` assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)` checks the result
   
   
   FYI expected `copy_query` looks like this 
   ```sql
   COPY schema.table (column_1, column_2)
   FROM 's3://bucket/key'
   with credentials
   'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
   ;
   ```
   
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646130343



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
 
         credentials_block = build_credentials_block(mock_session.return_value)
         copy_query = op._build_copy_query(credentials_block, copy_options)
+        expected_copy_query = '''
+                                COPY schema.table
+                                FROM 's3://bucket/key'
+                                with credentials
+                                'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+                                ;
+                             '''
+        assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+        assert mock_run.call_count == 1
+        assert access_key in copy_query
+        assert secret_key in copy_query
+        assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)

Review comment:
       Thanks for the comment ;)
   So would it be nicer to check execution with plain text rather than invoking private method `op._build_copy_query`  like this? 
   
   From checking execution by invoking private method
   ```python
           copy_query = op._build_copy_query(credentials_block, copy_options)
           expected_copy_query = '''
                                   COPY schema.table
                                   FROM 's3://bucket/key'
                                   with credentials
                                   'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
                                   ;
                                '''
           assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
   
           assert mock_run.call_count == 1
           assert access_key in copy_query
           assert secret_key in copy_query
           assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
   ```
   To checking execution by plain text
   ```python
           copy_query = '''
                                   COPY schema.table
                                   FROM 's3://bucket/key'
                                   with credentials
                                   'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
                                   ;
                                '''
           assert mock_run.call_count == 1
           assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646130343



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
 
         credentials_block = build_credentials_block(mock_session.return_value)
         copy_query = op._build_copy_query(credentials_block, copy_options)
+        expected_copy_query = '''
+                                COPY schema.table
+                                FROM 's3://bucket/key'
+                                with credentials
+                                'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+                                ;
+                             '''
+        assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+        assert mock_run.call_count == 1
+        assert access_key in copy_query
+        assert secret_key in copy_query
+        assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)

Review comment:
       Thanks for the comment @mik-laj ;)
   So would it be nicer to check execution with plain text rather than invoking private method `op._build_copy_query`  like this? 
   
   From checking execution by invoking private method
   ```python
           copy_query = op._build_copy_query(credentials_block, copy_options)
           expected_copy_query = '''
                                   COPY schema.table
                                   FROM 's3://bucket/key'
                                   with credentials
                                   'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
                                   ;
                                '''
           assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
   
           assert mock_run.call_count == 1
           assert access_key in copy_query
           assert secret_key in copy_query
           assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
   ```
   To checking execution by plain text
   ```python
           copy_query = '''
                           COPY schema.table
                           FROM 's3://bucket/key'
                           with credentials
                           'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
                           ;
                        '''
           assert mock_run.call_count == 1
           assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646130343



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
 
         credentials_block = build_credentials_block(mock_session.return_value)
         copy_query = op._build_copy_query(credentials_block, copy_options)
+        expected_copy_query = '''
+                                COPY schema.table
+                                FROM 's3://bucket/key'
+                                with credentials
+                                'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+                                ;
+                             '''
+        assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+        assert mock_run.call_count == 1
+        assert access_key in copy_query
+        assert secret_key in copy_query
+        assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)

Review comment:
       Thanks for the comment ;)
   So would it be nicer to check execution with plain text rather than invoking private method `op._build_copy_query`  like this? 
   
   From checking execution by invoking private method
   ```python
           copy_query = op._build_copy_query(credentials_block, copy_options)
           expected_copy_query = '''
                                   COPY schema.table
                                   FROM 's3://bucket/key'
                                   with credentials
                                   'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
                                   ;
                                '''
           assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
   
           assert mock_run.call_count == 1
           assert access_key in copy_query
           assert secret_key in copy_query
           assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
   ```
   To checking execution by plain text
   ```python
           copy_query = '''
                           COPY schema.table
                           FROM 's3://bucket/key'
                           with credentials
                           'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
                           ;
                        '''
           assert mock_run.call_count == 1
           assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r645163066



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
         assert secret_key in copy_query
         assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
 
+    @mock.patch("boto3.session.Session")
+    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    def test_execute_with_column_list(self, mock_run, mock_session):
+        access_key = "aws_access_key_id"
+        secret_key = "aws_secret_access_key"
+        mock_session.return_value = Session(access_key, secret_key)
+        mock_session.return_value.access_key = access_key
+        mock_session.return_value.secret_key = secret_key
+        mock_session.return_value.token = None
+
+        schema = "schema"
+        table = "table"
+        s3_bucket = "bucket"
+        s3_key = "key"
+        column_list = ["column_1", "column_2"]
+        copy_options = ""
+
+        op = S3ToRedshiftOperator(
+            schema=schema,
+            table=table,
+            s3_bucket=s3_bucket,
+            s3_key=s3_key,
+            column_list=column_list,
+            copy_options=copy_options,
+            redshift_conn_id="redshift_conn_id",
+            aws_conn_id="aws_conn_id",
+            task_id="task_id",
+            dag=None,
+        )
+        op.execute(None)
+
+        credentials_block = build_credentials_block(mock_session.return_value)
+        copy_query = op._build_copy_query(credentials_block, copy_options)

Review comment:
       Can you copy the result of this function to the test?

##########
File path: airflow/providers/amazon/aws/transfers/s3_to_redshift.py
##########
@@ -90,13 +93,15 @@ def __init__(
         self.redshift_conn_id = redshift_conn_id
         self.aws_conn_id = aws_conn_id
         self.verify = verify
+        self.column_list = column_list
         self.copy_options = copy_options or []
         self.autocommit = autocommit
         self.truncate_table = truncate_table
 
     def _build_copy_query(self, credentials_block: str, copy_options: str) -> str:
+        column_names = "(" + ", ".join(self.column_list) + ")" if self.column_list else None

Review comment:
       ```suggestion
           column_names = "(" + ", ".join(self.column_list) + ")" if self.column_list else ''
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646650620



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
 
         credentials_block = build_credentials_block(mock_session.return_value)
         copy_query = op._build_copy_query(credentials_block, copy_options)
+        expected_copy_query = '''
+                                COPY schema.table
+                                FROM 's3://bucket/key'
+                                with credentials
+                                'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+                                ;
+                             '''
+        assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+        assert mock_run.call_count == 1
+        assert access_key in copy_query
+        assert secret_key in copy_query
+        assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)

Review comment:
       Yes. Exactly. We should only check the places where the class is connecting to another class, not its internal details.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646130343



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
 
         credentials_block = build_credentials_block(mock_session.return_value)
         copy_query = op._build_copy_query(credentials_block, copy_options)
+        expected_copy_query = '''
+                                COPY schema.table
+                                FROM 's3://bucket/key'
+                                with credentials
+                                'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+                                ;
+                             '''
+        assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+        assert mock_run.call_count == 1
+        assert access_key in copy_query
+        assert secret_key in copy_query
+        assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)

Review comment:
       Thanks for the comment ;)
   So would it be nicer to check execution with plain text rather than invoking private method `op._build_copy_query`  like this? 
   
   From checking execution by invoking private method
   ```python
           copy_query = op._build_copy_query(credentials_block, copy_options)
           expected_copy_query = '''
                                   COPY schema.table
                                   FROM 's3://bucket/key'
                                   with credentials
                                   'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
                                   ;
                                '''
           assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
   
           assert mock_run.call_count == 1
           assert access_key in copy_query
           assert secret_key in copy_query
           assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
   ```
   To checking execution by plain text
   ```python
           copy_query = '''
                                   COPY schema.table
                                   FROM 's3://bucket/key'
                                   with credentials
                                   'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
                                   ;
                                '''
           assert mock_run.call_count == 1
           assert access_key in copy_query
           assert secret_key in copy_query
           assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sunki-hong commented on pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
sunki-hong commented on pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#issuecomment-860046102


   @mik-laj Hi, can you take a look and check if it is okay to merge?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #16241:
URL: https://github.com/apache/airflow/pull/16241


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r645163066



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
         assert secret_key in copy_query
         assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
 
+    @mock.patch("boto3.session.Session")
+    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    def test_execute_with_column_list(self, mock_run, mock_session):
+        access_key = "aws_access_key_id"
+        secret_key = "aws_secret_access_key"
+        mock_session.return_value = Session(access_key, secret_key)
+        mock_session.return_value.access_key = access_key
+        mock_session.return_value.secret_key = secret_key
+        mock_session.return_value.token = None
+
+        schema = "schema"
+        table = "table"
+        s3_bucket = "bucket"
+        s3_key = "key"
+        column_list = ["column_1", "column_2"]
+        copy_options = ""
+
+        op = S3ToRedshiftOperator(
+            schema=schema,
+            table=table,
+            s3_bucket=s3_bucket,
+            s3_key=s3_key,
+            column_list=column_list,
+            copy_options=copy_options,
+            redshift_conn_id="redshift_conn_id",
+            aws_conn_id="aws_conn_id",
+            task_id="task_id",
+            dag=None,
+        )
+        op.execute(None)
+
+        credentials_block = build_credentials_block(mock_session.return_value)
+        copy_query = op._build_copy_query(credentials_block, copy_options)

Review comment:
       Can you copy the result of this function to the test?

##########
File path: airflow/providers/amazon/aws/transfers/s3_to_redshift.py
##########
@@ -90,13 +93,15 @@ def __init__(
         self.redshift_conn_id = redshift_conn_id
         self.aws_conn_id = aws_conn_id
         self.verify = verify
+        self.column_list = column_list
         self.copy_options = copy_options or []
         self.autocommit = autocommit
         self.truncate_table = truncate_table
 
     def _build_copy_query(self, credentials_block: str, copy_options: str) -> str:
+        column_names = "(" + ", ".join(self.column_list) + ")" if self.column_list else None

Review comment:
       ```suggestion
           column_names = "(" + ", ".join(self.column_list) + ")" if self.column_list else ''
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646015111



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
         assert secret_key in copy_query
         assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
 
+    @mock.patch("boto3.session.Session")
+    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    def test_execute_with_column_list(self, mock_run, mock_session):
+        access_key = "aws_access_key_id"
+        secret_key = "aws_secret_access_key"
+        mock_session.return_value = Session(access_key, secret_key)
+        mock_session.return_value.access_key = access_key
+        mock_session.return_value.secret_key = secret_key
+        mock_session.return_value.token = None
+
+        schema = "schema"
+        table = "table"
+        s3_bucket = "bucket"
+        s3_key = "key"
+        column_list = ["column_1", "column_2"]
+        copy_options = ""
+
+        op = S3ToRedshiftOperator(
+            schema=schema,
+            table=table,
+            s3_bucket=s3_bucket,
+            s3_key=s3_key,
+            column_list=column_list,
+            copy_options=copy_options,
+            redshift_conn_id="redshift_conn_id",
+            aws_conn_id="aws_conn_id",
+            task_id="task_id",
+            dag=None,
+        )
+        op.execute(None)
+
+        credentials_block = build_credentials_block(mock_session.return_value)
+        copy_query = op._build_copy_query(credentials_block, copy_options)

Review comment:
       > FYI expected copy_query looks like this
   
   Can you add it to tests?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646130343



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
 
         credentials_block = build_credentials_block(mock_session.return_value)
         copy_query = op._build_copy_query(credentials_block, copy_options)
+        expected_copy_query = '''
+                                COPY schema.table
+                                FROM 's3://bucket/key'
+                                with credentials
+                                'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+                                ;
+                             '''
+        assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+        assert mock_run.call_count == 1
+        assert access_key in copy_query
+        assert secret_key in copy_query
+        assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)

Review comment:
       Thanks for the comment ;)
   So would it be nicer to check execution with plain text rather than invoking private method `op._build_copy_query` but ike this? 
   From invoking private method
   ```python
           copy_query = op._build_copy_query(credentials_block, copy_options)
           expected_copy_query = '''
                                   COPY schema.table
                                   FROM 's3://bucket/key'
                                   with credentials
                                   'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
                                   ;
                                '''
           assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
   
           assert mock_run.call_count == 1
           assert access_key in copy_query
           assert secret_key in copy_query
           assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
   ```
   To check with platin text
   ```python
           copy_query = '''
                                   COPY schema.table
                                   FROM 's3://bucket/key'
                                   with credentials
                                   'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
                                   ;
                                '''
           assert mock_run.call_count == 1
           assert access_key in copy_query
           assert secret_key in copy_query
           assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646123033



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
         assert secret_key in copy_query
         assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
 
+    @mock.patch("boto3.session.Session")
+    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    def test_execute_with_column_list(self, mock_run, mock_session):
+        access_key = "aws_access_key_id"
+        secret_key = "aws_secret_access_key"
+        mock_session.return_value = Session(access_key, secret_key)
+        mock_session.return_value.access_key = access_key
+        mock_session.return_value.secret_key = secret_key
+        mock_session.return_value.token = None
+
+        schema = "schema"
+        table = "table"
+        s3_bucket = "bucket"
+        s3_key = "key"
+        column_list = ["column_1", "column_2"]
+        copy_options = ""
+
+        op = S3ToRedshiftOperator(
+            schema=schema,
+            table=table,
+            s3_bucket=s3_bucket,
+            s3_key=s3_key,
+            column_list=column_list,
+            copy_options=copy_options,
+            redshift_conn_id="redshift_conn_id",
+            aws_conn_id="aws_conn_id",
+            task_id="task_id",
+            dag=None,
+        )
+        op.execute(None)
+
+        credentials_block = build_credentials_block(mock_session.return_value)
+        copy_query = op._build_copy_query(credentials_block, copy_options)

Review comment:
       Implemented !
   
   SQL without columns
   https://github.com/apache/airflow/blob/d1dc18d9c84c2b3d1f3005a69f93a1adc59476b8/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py#L62-L69
   
   
   SQL with columns
   https://github.com/apache/airflow/blob/d1dc18d9c84c2b3d1f3005a69f93a1adc59476b8/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py#L109-L116




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646130343



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
 
         credentials_block = build_credentials_block(mock_session.return_value)
         copy_query = op._build_copy_query(credentials_block, copy_options)
+        expected_copy_query = '''
+                                COPY schema.table
+                                FROM 's3://bucket/key'
+                                with credentials
+                                'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+                                ;
+                             '''
+        assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+        assert mock_run.call_count == 1
+        assert access_key in copy_query
+        assert secret_key in copy_query
+        assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)

Review comment:
       Thanks for the comment ;)
   So would it be nicer to check execution with plain text rather than invoking private method `op._build_copy_query`  like this? 
   From invoking private method
   ```python
           copy_query = op._build_copy_query(credentials_block, copy_options)
           expected_copy_query = '''
                                   COPY schema.table
                                   FROM 's3://bucket/key'
                                   with credentials
                                   'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
                                   ;
                                '''
           assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
   
           assert mock_run.call_count == 1
           assert access_key in copy_query
           assert secret_key in copy_query
           assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
   ```
   To check with platin text
   ```python
           copy_query = '''
                                   COPY schema.table
                                   FROM 's3://bucket/key'
                                   with credentials
                                   'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
                                   ;
                                '''
           assert mock_run.call_count == 1
           assert access_key in copy_query
           assert secret_key in copy_query
           assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646098061



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
         assert secret_key in copy_query
         assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
 
+    @mock.patch("boto3.session.Session")
+    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    def test_execute_with_column_list(self, mock_run, mock_session):
+        access_key = "aws_access_key_id"
+        secret_key = "aws_secret_access_key"
+        mock_session.return_value = Session(access_key, secret_key)
+        mock_session.return_value.access_key = access_key
+        mock_session.return_value.secret_key = secret_key
+        mock_session.return_value.token = None
+
+        schema = "schema"
+        table = "table"
+        s3_bucket = "bucket"
+        s3_key = "key"
+        column_list = ["column_1", "column_2"]
+        copy_options = ""
+
+        op = S3ToRedshiftOperator(
+            schema=schema,
+            table=table,
+            s3_bucket=s3_bucket,
+            s3_key=s3_key,
+            column_list=column_list,
+            copy_options=copy_options,
+            redshift_conn_id="redshift_conn_id",
+            aws_conn_id="aws_conn_id",
+            task_id="task_id",
+            dag=None,
+        )
+        op.execute(None)
+
+        credentials_block = build_credentials_block(mock_session.return_value)
+        copy_query = op._build_copy_query(credentials_block, copy_options)

Review comment:
       Can you add tests for SQL without columns also?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift

Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646093864



##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
         assert secret_key in copy_query
         assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
 
+    @mock.patch("boto3.session.Session")
+    @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+    def test_execute_with_column_list(self, mock_run, mock_session):
+        access_key = "aws_access_key_id"
+        secret_key = "aws_secret_access_key"
+        mock_session.return_value = Session(access_key, secret_key)
+        mock_session.return_value.access_key = access_key
+        mock_session.return_value.secret_key = secret_key
+        mock_session.return_value.token = None
+
+        schema = "schema"
+        table = "table"
+        s3_bucket = "bucket"
+        s3_key = "key"
+        column_list = ["column_1", "column_2"]
+        copy_options = ""
+
+        op = S3ToRedshiftOperator(
+            schema=schema,
+            table=table,
+            s3_bucket=s3_bucket,
+            s3_key=s3_key,
+            column_list=column_list,
+            copy_options=copy_options,
+            redshift_conn_id="redshift_conn_id",
+            aws_conn_id="aws_conn_id",
+            task_id="task_id",
+            dag=None,
+        )
+        op.execute(None)
+
+        credentials_block = build_credentials_block(mock_session.return_value)
+        copy_query = op._build_copy_query(credentials_block, copy_options)

Review comment:
       Oh then like this?
   
   ```suggestion
           copy_query = op._build_copy_query(credentials_block, copy_options)
           expected_copy_query = '''
                                   COPY schema.table (column_1, column_2)
                                   FROM 's3://bucket/key'
                                   with credentials
                                   'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
                                   ;
                                 '''
           assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org