You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/03 14:12:46 UTC
[GitHub] [airflow] sunki-hong opened a new pull request #16241: Update copy command for s3 to redshift
sunki-hong opened a new pull request #16241:
URL: https://github.com/apache/airflow/pull/16241
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
Update copy command for s3_to_redshift to be able to select column names while copying files from s3 to redshift.
```sql
COPY tablename (column1 [,column2, ...])
```
Related aws documentation ([Link](https://docs.aws.amazon.com/ko_kr/redshift/latest/dg/r_COPY.html)),
---
**^ Add meaningful description above**
Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646098061
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
+ @mock.patch("boto3.session.Session")
+ @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+ def test_execute_with_column_list(self, mock_run, mock_session):
+ access_key = "aws_access_key_id"
+ secret_key = "aws_secret_access_key"
+ mock_session.return_value = Session(access_key, secret_key)
+ mock_session.return_value.access_key = access_key
+ mock_session.return_value.secret_key = secret_key
+ mock_session.return_value.token = None
+
+ schema = "schema"
+ table = "table"
+ s3_bucket = "bucket"
+ s3_key = "key"
+ column_list = ["column_1", "column_2"]
+ copy_options = ""
+
+ op = S3ToRedshiftOperator(
+ schema=schema,
+ table=table,
+ s3_bucket=s3_bucket,
+ s3_key=s3_key,
+ column_list=column_list,
+ copy_options=copy_options,
+ redshift_conn_id="redshift_conn_id",
+ aws_conn_id="aws_conn_id",
+ task_id="task_id",
+ dag=None,
+ )
+ op.execute(None)
+
+ credentials_block = build_credentials_block(mock_session.return_value)
+ copy_query = op._build_copy_query(credentials_block, copy_options)
Review comment:
Can you add tests for SQL without columns?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646093864
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
+ @mock.patch("boto3.session.Session")
+ @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+ def test_execute_with_column_list(self, mock_run, mock_session):
+ access_key = "aws_access_key_id"
+ secret_key = "aws_secret_access_key"
+ mock_session.return_value = Session(access_key, secret_key)
+ mock_session.return_value.access_key = access_key
+ mock_session.return_value.secret_key = secret_key
+ mock_session.return_value.token = None
+
+ schema = "schema"
+ table = "table"
+ s3_bucket = "bucket"
+ s3_key = "key"
+ column_list = ["column_1", "column_2"]
+ copy_options = ""
+
+ op = S3ToRedshiftOperator(
+ schema=schema,
+ table=table,
+ s3_bucket=s3_bucket,
+ s3_key=s3_key,
+ column_list=column_list,
+ copy_options=copy_options,
+ redshift_conn_id="redshift_conn_id",
+ aws_conn_id="aws_conn_id",
+ task_id="task_id",
+ dag=None,
+ )
+ op.execute(None)
+
+ credentials_block = build_credentials_block(mock_session.return_value)
+ copy_query = op._build_copy_query(credentials_block, copy_options)
Review comment:
Oh then like this?
```suggestion
copy_query = op._build_copy_query(credentials_block, copy_options)
expected_copy_query = '''
COPY schema.table (column_1, column_2)
FROM 's3://bucket/key'
with credentials
'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
;
'''
assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646124491
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
credentials_block = build_credentials_block(mock_session.return_value)
copy_query = op._build_copy_query(credentials_block, copy_options)
+ expected_copy_query = '''
+ COPY schema.table
+ FROM 's3://bucket/key'
+ with credentials
+ 'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+ ;
+ '''
+ assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+ assert mock_run.call_count == 1
+ assert access_key in copy_query
+ assert secret_key in copy_query
+ assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
Review comment:
here we have access to the query, so we can drop of invoking private methods and compare this value. Thus, this test will still check implementations, but will depend less on the internal details.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646097855
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
+ @mock.patch("boto3.session.Session")
+ @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+ def test_execute_with_column_list(self, mock_run, mock_session):
+ access_key = "aws_access_key_id"
+ secret_key = "aws_secret_access_key"
+ mock_session.return_value = Session(access_key, secret_key)
+ mock_session.return_value.access_key = access_key
+ mock_session.return_value.secret_key = secret_key
+ mock_session.return_value.token = None
+
+ schema = "schema"
+ table = "table"
+ s3_bucket = "bucket"
+ s3_key = "key"
+ column_list = ["column_1", "column_2"]
+ copy_options = ""
+
+ op = S3ToRedshiftOperator(
+ schema=schema,
+ table=table,
+ s3_bucket=s3_bucket,
+ s3_key=s3_key,
+ column_list=column_list,
+ copy_options=copy_options,
+ redshift_conn_id="redshift_conn_id",
+ aws_conn_id="aws_conn_id",
+ task_id="task_id",
+ dag=None,
+ )
+ op.execute(None)
+
+ credentials_block = build_credentials_block(mock_session.return_value)
+ copy_query = op._build_copy_query(credentials_block, copy_options)
Review comment:
Exactly. Building this SQL is crucial in integrating with a thirty-patty service, so it should show up in tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646774062
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
credentials_block = build_credentials_block(mock_session.return_value)
copy_query = op._build_copy_query(credentials_block, copy_options)
+ expected_copy_query = '''
+ COPY schema.table
+ FROM 's3://bucket/key'
+ with credentials
+ 'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+ ;
+ '''
+ assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+ assert mock_run.call_count == 1
+ assert access_key in copy_query
+ assert secret_key in copy_query
+ assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
Review comment:
Done :)
Changed all copy_query to plain text
Thanks for all the comment @mik-laj
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646005770
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
+ @mock.patch("boto3.session.Session")
+ @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+ def test_execute_with_column_list(self, mock_run, mock_session):
+ access_key = "aws_access_key_id"
+ secret_key = "aws_secret_access_key"
+ mock_session.return_value = Session(access_key, secret_key)
+ mock_session.return_value.access_key = access_key
+ mock_session.return_value.secret_key = secret_key
+ mock_session.return_value.token = None
+
+ schema = "schema"
+ table = "table"
+ s3_bucket = "bucket"
+ s3_key = "key"
+ column_list = ["column_1", "column_2"]
+ copy_options = ""
+
+ op = S3ToRedshiftOperator(
+ schema=schema,
+ table=table,
+ s3_bucket=s3_bucket,
+ s3_key=s3_key,
+ column_list=column_list,
+ copy_options=copy_options,
+ redshift_conn_id="redshift_conn_id",
+ aws_conn_id="aws_conn_id",
+ task_id="task_id",
+ dag=None,
+ )
+ op.execute(None)
+
+ credentials_block = build_credentials_block(mock_session.return_value)
+ copy_query = op._build_copy_query(credentials_block, copy_options)
Review comment:
Hi @mik-laj :) Thanks for the review.
Could you explain it in more detail?
Do you want the result of `op._build_copy_query` to be asserted with expected plain copy text?
I thought that line 105 ` assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)` checks the result
FYI expected `copy_query` looks like this
```sql
COPY schema.table (column_1, column_2)
FROM 's3://bucket/key'
with credentials
'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646130343
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
credentials_block = build_credentials_block(mock_session.return_value)
copy_query = op._build_copy_query(credentials_block, copy_options)
+ expected_copy_query = '''
+ COPY schema.table
+ FROM 's3://bucket/key'
+ with credentials
+ 'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+ ;
+ '''
+ assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+ assert mock_run.call_count == 1
+ assert access_key in copy_query
+ assert secret_key in copy_query
+ assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
Review comment:
Thanks for the comment ;)
So would it be nicer to check execution with plain text rather than invoking private method `op._build_copy_query` like this?
From checking execution by invoking private method
```python
copy_query = op._build_copy_query(credentials_block, copy_options)
expected_copy_query = '''
COPY schema.table
FROM 's3://bucket/key'
with credentials
'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
;
'''
assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
assert mock_run.call_count == 1
assert access_key in copy_query
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
```
To checking execution by plain text
```python
copy_query = '''
COPY schema.table
FROM 's3://bucket/key'
with credentials
'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
;
'''
assert mock_run.call_count == 1
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646130343
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
credentials_block = build_credentials_block(mock_session.return_value)
copy_query = op._build_copy_query(credentials_block, copy_options)
+ expected_copy_query = '''
+ COPY schema.table
+ FROM 's3://bucket/key'
+ with credentials
+ 'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+ ;
+ '''
+ assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+ assert mock_run.call_count == 1
+ assert access_key in copy_query
+ assert secret_key in copy_query
+ assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
Review comment:
Thanks for the comment @mik-laj ;)
So would it be nicer to check execution with plain text rather than invoking private method `op._build_copy_query` like this?
From checking execution by invoking private method
```python
copy_query = op._build_copy_query(credentials_block, copy_options)
expected_copy_query = '''
COPY schema.table
FROM 's3://bucket/key'
with credentials
'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
;
'''
assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
assert mock_run.call_count == 1
assert access_key in copy_query
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
```
To checking execution by plain text
```python
copy_query = '''
COPY schema.table
FROM 's3://bucket/key'
with credentials
'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
;
'''
assert mock_run.call_count == 1
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646130343
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
credentials_block = build_credentials_block(mock_session.return_value)
copy_query = op._build_copy_query(credentials_block, copy_options)
+ expected_copy_query = '''
+ COPY schema.table
+ FROM 's3://bucket/key'
+ with credentials
+ 'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+ ;
+ '''
+ assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+ assert mock_run.call_count == 1
+ assert access_key in copy_query
+ assert secret_key in copy_query
+ assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
Review comment:
Thanks for the comment ;)
So would it be nicer to check execution with plain text rather than invoking private method `op._build_copy_query` like this?
From checking execution by invoking private method
```python
copy_query = op._build_copy_query(credentials_block, copy_options)
expected_copy_query = '''
COPY schema.table
FROM 's3://bucket/key'
with credentials
'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
;
'''
assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
assert mock_run.call_count == 1
assert access_key in copy_query
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
```
To checking execution by plain text
```python
copy_query = '''
COPY schema.table
FROM 's3://bucket/key'
with credentials
'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
;
'''
assert mock_run.call_count == 1
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r645163066
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
+ @mock.patch("boto3.session.Session")
+ @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+ def test_execute_with_column_list(self, mock_run, mock_session):
+ access_key = "aws_access_key_id"
+ secret_key = "aws_secret_access_key"
+ mock_session.return_value = Session(access_key, secret_key)
+ mock_session.return_value.access_key = access_key
+ mock_session.return_value.secret_key = secret_key
+ mock_session.return_value.token = None
+
+ schema = "schema"
+ table = "table"
+ s3_bucket = "bucket"
+ s3_key = "key"
+ column_list = ["column_1", "column_2"]
+ copy_options = ""
+
+ op = S3ToRedshiftOperator(
+ schema=schema,
+ table=table,
+ s3_bucket=s3_bucket,
+ s3_key=s3_key,
+ column_list=column_list,
+ copy_options=copy_options,
+ redshift_conn_id="redshift_conn_id",
+ aws_conn_id="aws_conn_id",
+ task_id="task_id",
+ dag=None,
+ )
+ op.execute(None)
+
+ credentials_block = build_credentials_block(mock_session.return_value)
+ copy_query = op._build_copy_query(credentials_block, copy_options)
Review comment:
Can you copy the result of this function to the test?
##########
File path: airflow/providers/amazon/aws/transfers/s3_to_redshift.py
##########
@@ -90,13 +93,15 @@ def __init__(
self.redshift_conn_id = redshift_conn_id
self.aws_conn_id = aws_conn_id
self.verify = verify
+ self.column_list = column_list
self.copy_options = copy_options or []
self.autocommit = autocommit
self.truncate_table = truncate_table
def _build_copy_query(self, credentials_block: str, copy_options: str) -> str:
+ column_names = "(" + ", ".join(self.column_list) + ")" if self.column_list else None
Review comment:
```suggestion
column_names = "(" + ", ".join(self.column_list) + ")" if self.column_list else ''
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646650620
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
credentials_block = build_credentials_block(mock_session.return_value)
copy_query = op._build_copy_query(credentials_block, copy_options)
+ expected_copy_query = '''
+ COPY schema.table
+ FROM 's3://bucket/key'
+ with credentials
+ 'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+ ;
+ '''
+ assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+ assert mock_run.call_count == 1
+ assert access_key in copy_query
+ assert secret_key in copy_query
+ assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
Review comment:
Yes. Exactly. We should only check the places where the class is connecting to another class, not its internal details.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646130343
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
credentials_block = build_credentials_block(mock_session.return_value)
copy_query = op._build_copy_query(credentials_block, copy_options)
+ expected_copy_query = '''
+ COPY schema.table
+ FROM 's3://bucket/key'
+ with credentials
+ 'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+ ;
+ '''
+ assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+ assert mock_run.call_count == 1
+ assert access_key in copy_query
+ assert secret_key in copy_query
+ assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
Review comment:
Thanks for the comment ;)
So would it be nicer to check execution with plain text rather than invoking private method `op._build_copy_query` like this?
From checking execution by invoking private method
```python
copy_query = op._build_copy_query(credentials_block, copy_options)
expected_copy_query = '''
COPY schema.table
FROM 's3://bucket/key'
with credentials
'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
;
'''
assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
assert mock_run.call_count == 1
assert access_key in copy_query
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
```
To checking execution by plain text
```python
copy_query = '''
COPY schema.table
FROM 's3://bucket/key'
with credentials
'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
;
'''
assert mock_run.call_count == 1
assert access_key in copy_query
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] sunki-hong commented on pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
sunki-hong commented on pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#issuecomment-860046102
@mik-laj Hi, can you take a look and check if it is okay to merge?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk merged pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #16241:
URL: https://github.com/apache/airflow/pull/16241
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r645163066
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
+ @mock.patch("boto3.session.Session")
+ @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+ def test_execute_with_column_list(self, mock_run, mock_session):
+ access_key = "aws_access_key_id"
+ secret_key = "aws_secret_access_key"
+ mock_session.return_value = Session(access_key, secret_key)
+ mock_session.return_value.access_key = access_key
+ mock_session.return_value.secret_key = secret_key
+ mock_session.return_value.token = None
+
+ schema = "schema"
+ table = "table"
+ s3_bucket = "bucket"
+ s3_key = "key"
+ column_list = ["column_1", "column_2"]
+ copy_options = ""
+
+ op = S3ToRedshiftOperator(
+ schema=schema,
+ table=table,
+ s3_bucket=s3_bucket,
+ s3_key=s3_key,
+ column_list=column_list,
+ copy_options=copy_options,
+ redshift_conn_id="redshift_conn_id",
+ aws_conn_id="aws_conn_id",
+ task_id="task_id",
+ dag=None,
+ )
+ op.execute(None)
+
+ credentials_block = build_credentials_block(mock_session.return_value)
+ copy_query = op._build_copy_query(credentials_block, copy_options)
Review comment:
Can you copy the result of this function to the test?
##########
File path: airflow/providers/amazon/aws/transfers/s3_to_redshift.py
##########
@@ -90,13 +93,15 @@ def __init__(
self.redshift_conn_id = redshift_conn_id
self.aws_conn_id = aws_conn_id
self.verify = verify
+ self.column_list = column_list
self.copy_options = copy_options or []
self.autocommit = autocommit
self.truncate_table = truncate_table
def _build_copy_query(self, credentials_block: str, copy_options: str) -> str:
+ column_names = "(" + ", ".join(self.column_list) + ")" if self.column_list else None
Review comment:
```suggestion
column_names = "(" + ", ".join(self.column_list) + ")" if self.column_list else ''
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646015111
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
+ @mock.patch("boto3.session.Session")
+ @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+ def test_execute_with_column_list(self, mock_run, mock_session):
+ access_key = "aws_access_key_id"
+ secret_key = "aws_secret_access_key"
+ mock_session.return_value = Session(access_key, secret_key)
+ mock_session.return_value.access_key = access_key
+ mock_session.return_value.secret_key = secret_key
+ mock_session.return_value.token = None
+
+ schema = "schema"
+ table = "table"
+ s3_bucket = "bucket"
+ s3_key = "key"
+ column_list = ["column_1", "column_2"]
+ copy_options = ""
+
+ op = S3ToRedshiftOperator(
+ schema=schema,
+ table=table,
+ s3_bucket=s3_bucket,
+ s3_key=s3_key,
+ column_list=column_list,
+ copy_options=copy_options,
+ redshift_conn_id="redshift_conn_id",
+ aws_conn_id="aws_conn_id",
+ task_id="task_id",
+ dag=None,
+ )
+ op.execute(None)
+
+ credentials_block = build_credentials_block(mock_session.return_value)
+ copy_query = op._build_copy_query(credentials_block, copy_options)
Review comment:
> FYI expected copy_query looks like this
Can you add it to tests?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646130343
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
credentials_block = build_credentials_block(mock_session.return_value)
copy_query = op._build_copy_query(credentials_block, copy_options)
+ expected_copy_query = '''
+ COPY schema.table
+ FROM 's3://bucket/key'
+ with credentials
+ 'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+ ;
+ '''
+ assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+ assert mock_run.call_count == 1
+ assert access_key in copy_query
+ assert secret_key in copy_query
+ assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
Review comment:
Thanks for the comment ;)
So would it be nicer to check execution with plain text rather than invoking private method `op._build_copy_query` but ike this?
From invoking private method
```python
copy_query = op._build_copy_query(credentials_block, copy_options)
expected_copy_query = '''
COPY schema.table
FROM 's3://bucket/key'
with credentials
'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
;
'''
assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
assert mock_run.call_count == 1
assert access_key in copy_query
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
```
To check with platin text
```python
copy_query = '''
COPY schema.table
FROM 's3://bucket/key'
with credentials
'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
;
'''
assert mock_run.call_count == 1
assert access_key in copy_query
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646123033
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
+ @mock.patch("boto3.session.Session")
+ @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+ def test_execute_with_column_list(self, mock_run, mock_session):
+ access_key = "aws_access_key_id"
+ secret_key = "aws_secret_access_key"
+ mock_session.return_value = Session(access_key, secret_key)
+ mock_session.return_value.access_key = access_key
+ mock_session.return_value.secret_key = secret_key
+ mock_session.return_value.token = None
+
+ schema = "schema"
+ table = "table"
+ s3_bucket = "bucket"
+ s3_key = "key"
+ column_list = ["column_1", "column_2"]
+ copy_options = ""
+
+ op = S3ToRedshiftOperator(
+ schema=schema,
+ table=table,
+ s3_bucket=s3_bucket,
+ s3_key=s3_key,
+ column_list=column_list,
+ copy_options=copy_options,
+ redshift_conn_id="redshift_conn_id",
+ aws_conn_id="aws_conn_id",
+ task_id="task_id",
+ dag=None,
+ )
+ op.execute(None)
+
+ credentials_block = build_credentials_block(mock_session.return_value)
+ copy_query = op._build_copy_query(credentials_block, copy_options)
Review comment:
Implemented !
SQL without columns
https://github.com/apache/airflow/blob/d1dc18d9c84c2b3d1f3005a69f93a1adc59476b8/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py#L62-L69
SQL with columns
https://github.com/apache/airflow/blob/d1dc18d9c84c2b3d1f3005a69f93a1adc59476b8/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py#L109-L116
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646130343
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -59,6 +59,61 @@ def test_execute(self, mock_run, mock_session):
credentials_block = build_credentials_block(mock_session.return_value)
copy_query = op._build_copy_query(credentials_block, copy_options)
+ expected_copy_query = '''
+ COPY schema.table
+ FROM 's3://bucket/key'
+ with credentials
+ 'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
+ ;
+ '''
+ assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
+
+ assert mock_run.call_count == 1
+ assert access_key in copy_query
+ assert secret_key in copy_query
+ assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
Review comment:
Thanks for the comment ;)
So would it be nicer to check execution with plain text rather than invoking private method `op._build_copy_query` like this?
From invoking private method
```python
copy_query = op._build_copy_query(credentials_block, copy_options)
expected_copy_query = '''
COPY schema.table
FROM 's3://bucket/key'
with credentials
'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
;
'''
assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
assert mock_run.call_count == 1
assert access_key in copy_query
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
```
To check with platin text
```python
copy_query = '''
COPY schema.table
FROM 's3://bucket/key'
with credentials
'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
;
'''
assert mock_run.call_count == 1
assert access_key in copy_query
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646098061
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
+ @mock.patch("boto3.session.Session")
+ @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+ def test_execute_with_column_list(self, mock_run, mock_session):
+ access_key = "aws_access_key_id"
+ secret_key = "aws_secret_access_key"
+ mock_session.return_value = Session(access_key, secret_key)
+ mock_session.return_value.access_key = access_key
+ mock_session.return_value.secret_key = secret_key
+ mock_session.return_value.token = None
+
+ schema = "schema"
+ table = "table"
+ s3_bucket = "bucket"
+ s3_key = "key"
+ column_list = ["column_1", "column_2"]
+ copy_options = ""
+
+ op = S3ToRedshiftOperator(
+ schema=schema,
+ table=table,
+ s3_bucket=s3_bucket,
+ s3_key=s3_key,
+ column_list=column_list,
+ copy_options=copy_options,
+ redshift_conn_id="redshift_conn_id",
+ aws_conn_id="aws_conn_id",
+ task_id="task_id",
+ dag=None,
+ )
+ op.execute(None)
+
+ credentials_block = build_credentials_block(mock_session.return_value)
+ copy_query = op._build_copy_query(credentials_block, copy_options)
Review comment:
Can you add tests for SQL without columns also?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] sunki-hong commented on a change in pull request #16241: Update copy command for s3 to redshift
Posted by GitBox <gi...@apache.org>.
sunki-hong commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r646093864
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
+ @mock.patch("boto3.session.Session")
+ @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+ def test_execute_with_column_list(self, mock_run, mock_session):
+ access_key = "aws_access_key_id"
+ secret_key = "aws_secret_access_key"
+ mock_session.return_value = Session(access_key, secret_key)
+ mock_session.return_value.access_key = access_key
+ mock_session.return_value.secret_key = secret_key
+ mock_session.return_value.token = None
+
+ schema = "schema"
+ table = "table"
+ s3_bucket = "bucket"
+ s3_key = "key"
+ column_list = ["column_1", "column_2"]
+ copy_options = ""
+
+ op = S3ToRedshiftOperator(
+ schema=schema,
+ table=table,
+ s3_bucket=s3_bucket,
+ s3_key=s3_key,
+ column_list=column_list,
+ copy_options=copy_options,
+ redshift_conn_id="redshift_conn_id",
+ aws_conn_id="aws_conn_id",
+ task_id="task_id",
+ dag=None,
+ )
+ op.execute(None)
+
+ credentials_block = build_credentials_block(mock_session.return_value)
+ copy_query = op._build_copy_query(credentials_block, copy_options)
Review comment:
Oh then like this?
```suggestion
copy_query = op._build_copy_query(credentials_block, copy_options)
expected_copy_query = '''
COPY schema.table (column_1, column_2)
FROM 's3://bucket/key'
with credentials
'aws_access_key_id=aws_access_key_id;aws_secret_access_key=aws_secret_access_key'
;
'''
assert_equal_ignore_multiple_spaces(self, expected_copy_query, copy_query)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org