You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/12/30 07:51:01 UTC

[GitHub] [airflow] dstandish commented on a change in pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

dstandish commented on a change in pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#discussion_r549979441



##########
File path: airflow/providers/snowflake/transfers/s3_to_snowflake.py
##########
@@ -71,16 +71,14 @@ def __init__(
     def execute(self, context: Any) -> None:
         snowflake_hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
 
-        # Snowflake won't accept list of files it has to be tuple only.
-        # but in python tuple([1]) = (1,) => which is invalid for snowflake
-        files = str(self.s3_keys)
-        files = files.replace('[', '(')
-        files = files.replace(']', ')')
+        files = ""
+        if self.s3_keys:
+            files = "files=({})".format(", ".join(f"'{key}'" for key in self.s3_keys))
 
         # we can extend this based on stage
         base_sql = """
                     FROM @{stage}/

Review comment:
       Yeah so in addition to loading every file in stage, another pattern supported is having one stage per bucket and specify prefix e.g. `FROM '@{stage}/my-prefix/my-subprefix/v1/`
   
   i think this PR would be a good opportunity to add support for that too.  perhaps it's as simple as adding a param `prefix` and making `from_path=f"{stage}/{prefix}" what do you think @sekikn?
   
   though if you want to call that out of scope i'll withdraw the suggestion.

##########
File path: tests/providers/snowflake/transfers/test_s3_to_snowflake.py
##########
@@ -26,6 +26,42 @@
 class TestS3ToSnowflakeTransfer(unittest.TestCase):
     @mock.patch("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run")
     def test_execute(self, mock_run):
+        table = 'table'
+        stage = 'stage'
+        file_format = 'file_format'
+        schema = 'schema'
+
+        S3ToSnowflakeOperator(
+            s3_keys=None,
+            table=table,
+            stage=stage,
+            file_format=file_format,
+            schema=schema,
+            columns_array=None,
+            task_id="task_id",
+            dag=None,
+        ).execute(None)
+
+        base_sql = """
+                FROM @{stage}/
+
+                file_format={file_format}
+            """.format(
+            stage=stage, file_format=file_format
+        )
+
+        copy_query = """
+                COPY INTO {schema}.{table} {base_sql}
+            """.format(
+            schema=schema, table=table, base_sql=base_sql
+        )
+        copy_query = "\n".join(line.strip() for line in copy_query.splitlines())
+
+        mock_run.assert_called_once()
+        assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
+
+    @mock.patch("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run")
+    def test_execute_with_s3_keys(self, mock_run):

Review comment:
       ```suggestion
   class TestS3ToSnowflakeTransfer(unittest.TestCase):
       @parameterized.expand([
           ('base', dict(table='table',
                         stage='stage',
                         file_format='file_format',
                         schema='schema',
                         ), "COPY INTO schema.table FROM @stage/ file_format=file_format"),
           ('files', dict(
               s3_keys=['1.csv', '2.csv'],
               table='table',
               stage='stage',
               file_format='file_format',
               schema='schema',
           ), "COPY INTO schema.table FROM @stage/ files=('1.csv', '2.csv') file_format=file_format"),
           ('columns', dict(
               table='table',
               stage='stage',
               file_format='file_format',
               schema='schema',
               columns_array=['col1', 'col2', 'col3'],
           ), "COPY INTO schema.table(col1,col2,col3) FROM @stage/ file_format=file_format"),
       ])
       @mock.patch("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run")
       def test_execute(self, name, kwargs, expected, mock_run):
           S3ToSnowflakeOperator(
               **kwargs,
               task_id="task_id",
               dag=None,
           ).execute(None)
   
           mock_run.assert_called_once()
           assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], expected)
   ```
   entire test can be replaced with this
   using parameterization makes it much easier to read IMO




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org