You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/03 22:08:10 UTC
[GitHub] [airflow] mik-laj commented on a change in pull request #16241: Update copy command for s3 to redshift
mik-laj commented on a change in pull request #16241:
URL: https://github.com/apache/airflow/pull/16241#discussion_r645163066
##########
File path: tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
##########
@@ -65,6 +65,45 @@ def test_execute(self, mock_run, mock_session):
assert secret_key in copy_query
assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
+ @mock.patch("boto3.session.Session")
+ @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+ def test_execute_with_column_list(self, mock_run, mock_session):
+ access_key = "aws_access_key_id"
+ secret_key = "aws_secret_access_key"
+ mock_session.return_value = Session(access_key, secret_key)
+ mock_session.return_value.access_key = access_key
+ mock_session.return_value.secret_key = secret_key
+ mock_session.return_value.token = None
+
+ schema = "schema"
+ table = "table"
+ s3_bucket = "bucket"
+ s3_key = "key"
+ column_list = ["column_1", "column_2"]
+ copy_options = ""
+
+ op = S3ToRedshiftOperator(
+ schema=schema,
+ table=table,
+ s3_bucket=s3_bucket,
+ s3_key=s3_key,
+ column_list=column_list,
+ copy_options=copy_options,
+ redshift_conn_id="redshift_conn_id",
+ aws_conn_id="aws_conn_id",
+ task_id="task_id",
+ dag=None,
+ )
+ op.execute(None)
+
+ credentials_block = build_credentials_block(mock_session.return_value)
+ copy_query = op._build_copy_query(credentials_block, copy_options)
Review comment:
Can you copy the result of this function to the test?
##########
File path: airflow/providers/amazon/aws/transfers/s3_to_redshift.py
##########
@@ -90,13 +93,15 @@ def __init__(
self.redshift_conn_id = redshift_conn_id
self.aws_conn_id = aws_conn_id
self.verify = verify
+ self.column_list = column_list
self.copy_options = copy_options or []
self.autocommit = autocommit
self.truncate_table = truncate_table
def _build_copy_query(self, credentials_block: str, copy_options: str) -> str:
+ column_names = "(" + ", ".join(self.column_list) + ")" if self.column_list else None
Review comment:
```suggestion
column_names = "(" + ", ".join(self.column_list) + ")" if self.column_list else ''
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org