You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/11/20 14:19:42 UTC

[GitHub] [airflow] sekikn opened a new pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

sekikn opened a new pull request #12505:
URL: https://github.com/apache/airflow/pull/12505


   Currently, users have to specify each file to upload as
   the "s3_keys" parameter when using S3ToSnowflakeOperator.
   But the `COPY INTO` statement, which S3ToSnowflakeOperator
   leverages internally, allows omitting this parameter
   so that users can upload whole files in the specified stage.
   https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#syntax
   
   This PR makes S3ToSnowflakeOperator's s3_keys parameter optional
   so as to support this functionality.
   
   In addition to adding a corresponding test case, I manually ran the revised
   version of S3ToSnowflakeOperator and confirmed that it worked, as follows:
   
   ----
   
   Before executing S3ToSnowflakeOperator, no record is in the target table (via Snowflake CLI):
   
   ```
   $ bin/snowsql --accountname *******.ap-southeast-1 --username sekikn --dbname FOO --schemaname BAR
   Password: 
   * SnowSQL * v1.2.10
   Type SQL statements or !help
   sekikn#COMPUTE_WH@FOO.BAR>LIST @S3_STG;
   +-------------------------------------+------+----------------------------------+-------------------------------+
   | name                                | size | md5                              | last_modified                 |
   |-------------------------------------+------+----------------------------------+-------------------------------|
   | s3://****/input/weather-snappy.avro |  330 | f1b4996b74a1a0de9e6ffa7d644f97a9 | Fri, 20 Nov 2020 02:42:15 GMT |
   | s3://****/input/weather.avro        |  358 | 0a1cfdeca549207de10f431c8ed6dd4c | Fri, 20 Nov 2020 01:40:50 GMT |
   +-------------------------------------+------+----------------------------------+-------------------------------+
   2 Row(s) produced. Time Elapsed: 3.046s
   sekikn#COMPUTE_WH@FOO.BAR>SELECT COUNT(*) FROM WEATHER;
   +----------+                                                                    
   | COUNT(*) |
   |----------|
   |        0 |
   +----------+
   1 Row(s) produced. Time Elapsed: 0.161s
   ```
   
   Execute S3ToSnowflakeOperator without s3_keys (via Python REPL):
   
   ```
   In [1]: from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeOperator
   [2020-11-20 19:55:02,997] {arrow_result.pyx:0} INFO - Failed to import optional packages, pyarrow
   
   In [2]: t = S3ToSnowflakeOperator(stage="FOO.BAR.S3_STG", file_format="(TYPE=AVRO)", schema="FOO.BAR", table="WEATHER", task_id="tid")
   
   In [3]: t.execute(None)
   [2020-11-20 19:55:13,968] {connection.py:210} INFO - Snowflake Connector for Python Version: 2.3.6, Python Version: 3.6.9, Platform: Linux-5.4.0-53-generic-x86_64-with-Ubuntu-18.04-bionic
   [2020-11-20 19:55:13,968] {connection.py:744} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
   [2020-11-20 19:55:13,968] {connection.py:760} INFO - Setting use_openssl_only mode to False
   [2020-11-20 19:55:14,780] {cursor.py:531} INFO - query: [ALTER SESSION SET autocommit=True]
   [2020-11-20 19:55:14,946] {cursor.py:553} INFO - query execution done
   [2020-11-20 19:55:14,946] {dbapi_hook.py:179} INFO - Running statement: 
                   COPY INTO FOO.BAR.WEATHER 
                       FROM @FOO.BAR.S3_STG/
                       
                       file_format=(TYPE=AVRO)
                   
               , parameters: None
   [2020-11-20 19:55:14,946] {cursor.py:531} INFO - query: [COPY INTO FOO.BAR.WEATHER FROM @FOO.BAR.S3_STG/  file_format=(TYPE=AVRO)]
   [2020-11-20 19:55:19,840] {cursor.py:553} INFO - query execution done
   [2020-11-20 19:55:19,840] {dbapi_hook.py:185} INFO - Rows affected: 2
   [2020-11-20 19:55:19,840] {connection.py:430} INFO - closed
   ```
   
   And all files are successfully uploaded (via Snowflake CLI):
   
   ```
   sekikn#COMPUTE_WH@FOO.BAR>SELECT COUNT(*) FROM WEATHER;
   +----------+                                                                    
   | COUNT(*) |
   |----------|
   |       10 |
   +----------+
   1 Row(s) produced. Time Elapsed: 0.746s
   ```
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761838898


   @potiuk just thing to keep in mind I'm this topic of transfer operators.  There is no s3 hook involved here. This is a pure snowflake operator.  It's just executing a snowflake command.  So in that sense, it's not properly a transfer operator.  Yes snowflake is accessing storage but the operator knows nothing about this. (Snowflake managed this)  And there's no reason to artificially restrict it's focus to s3 in the naming. 
   
   Though to be clear I'm not saying this particular or shouldn't go through. I'm just making a case for fixing the naming, if not now then later.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761838898


   @potiuk just thing to keep in mind I'm this topic of transfer operators.  There is no s3 hook involved there. This is a pure snowflake operator.  It's just executing a snowflake command.  So in that sense, to me it's not properly a transfer operator.  Yes snowflake is accessing storage but the operator knows nothing about this. (Snowflake managed this)  And there's no reason to artificially restrict it's focus to s3 in the naming. 
   
   Though to be clear I'm not saying this particular or shouldn't go through. I'm just making a case for fixing the naming, if not now then later.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761838898


   @potiuk just thing to keep in mind I'm this topic of transfer operators.  There is no s3 hook involved here. This is a pure snowflake operator.  It's just executing a snowflake command.  So in that sense, it's not properly a transfer operator.  Yes snowflake is accessing storage but the operator knows nothing about this. (Snowflake manages this)  And there's no reason to artificially restrict it's focus to s3 in the naming. 
   
   Though to be clear I'm not saying this particular or shouldn't go through. I'm just making a case for fixing the naming, if not now then later.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761890384


   > It's like having a MySqlINNODBOperator and a MySqlMyISAMOperator; either way you're just executing sql.
   
   If this is close to 100% of code - that's cool.
   
   How about backwards compatibility then ? We have the rule that we do not remove stuff (semver versioning for providers is clear about it). 
   
   We have an existing S3ToSnnowflake operator with s3_keys as parameter. 
   
   From what I see you propose is to have AnyStorageToSnowflake operator. 
   
   How about adding (as next step) such an operator with keeping the S3ToSnowflake as a proxy to this one (converting s3_keys to 'storage_keys' - but throwing depracation warning). Then you could  system-testing the other cases (Google/Azure) and add the general operator next to the existing one. The 'extra' code will be just mapping init parameters. and in 2.0.0 version of snowflake provider we could drop it.
   
   How about that?
   
   There is a big value in keeping backwards-compatibility. We've done that for some 500 operators when we moved them from "contrib" to "providers". This is not really a 'waste' - this is thinking about your users.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-760392600


   one observation, which we might not want to deal with at this time but i'll make it anyway..
   
   snowflake supports azure  / s3 / gcs
   
   this operator assumes a stage and not any particular storage backend, so i am pretty sure it works identically with gcp and azure.
   
   so it might be we should deprecate this and rename it not as a transfer operator but a SnowflakeCopyOperator
   
   we'd just need to rename s3_keys to files
   
   additionally, i think it should eventually be updated to parse the results and fail as appropriate, which could depend on whether there are skipped or failed files and should be configurable.  but i'm sure you'll want to consider this out of scope on this change.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sekikn commented on a change in pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
sekikn commented on a change in pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#discussion_r554579260



##########
File path: airflow/providers/snowflake/transfers/s3_to_snowflake.py
##########
@@ -71,16 +71,14 @@ def __init__(
     def execute(self, context: Any) -> None:
         snowflake_hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
 
-        # Snowflake won't accept list of files it has to be tuple only.
-        # but in python tuple([1]) = (1,) => which is invalid for snowflake
-        files = str(self.s3_keys)
-        files = files.replace('[', '(')
-        files = files.replace(']', ')')
+        files = ""
+        if self.s3_keys:
+            files = "files=({})".format(", ".join(f"'{key}'" for key in self.s3_keys))
 
         # we can extend this based on stage
         base_sql = """
                     FROM @{stage}/

Review comment:
       Thanks! My first thought was that users can specify prefix within the "stage" variable, but your design is cleaner actually. Just added it.
   I manually confirmed that parameter worked, as follows:
   
   Given an example S3 bucket with the following structure (each file contains five records),
   
   ```
   $ aws s3 ls --recursive s3://****/input
   2020-11-20 10:40:25          0 input/
   2021-01-10 23:04:49          0 input/subdir/
   2021-01-10 23:39:35        335 input/subdir/weather-sorted.avro
   2020-11-20 11:42:15        330 input/weather-snappy.avro
   2020-11-20 10:40:50        358 input/weather.avro
   ```
   
   and the following stage and target table:
   
   ```
   sekikn#COMPUTE_WH@FOO.BAR>DESC STAGE S3_STG;
   +--------------------+---------------------+---------------+------------------------------------------------+------------------+
   | parent_property    | property            | property_type | property_value                                 | property_default |
   |--------------------+---------------------+---------------+------------------------------------------------+------------------|
   
   (snip)
   
   | STAGE_COPY_OPTIONS | FORCE               | Boolean       | true                                           | false            |
   | STAGE_LOCATION     | URL                 | String        | ["s3://****/input/"]                           |                  |
   | STAGE_INTEGRATION  | STORAGE_INTEGRATION | String        | S3_INT                                         |                  |
   
   (snip)
   
   sekikn#COMPUTE_WH@FOO.BAR>DESC TABLE WEATHER;
   +------+---------+--------+-------+---------+-------------+------------+-------+------------+---------+
   | name | type    | kind   | null? | default | primary key | unique key | check | expression | comment |
   |------+---------+--------+-------+---------+-------------+------------+-------+------------+---------|
   | C    | VARIANT | COLUMN | Y     | NULL    | N           | N          | NULL  | NULL       | NULL    |
   +------+---------+--------+-------+---------+-------------+------------+-------+------------+---------+
   1 Row(s) produced. Time Elapsed: 0.568s
   sekikn#COMPUTE_WH@FOO.BAR>SELECT COUNT(*) FROM WEATHER;
   +----------+                                                                    
   | COUNT(*) |
   |----------|
   |        0 |
   +----------+
   1 Row(s) produced. Time Elapsed: 0.575s
   ```
   
   First, ran S3ToSnowflakeOperator without prefix,
   
   ```
   In [1]: from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeOperator
   /home/sekikn/venv/a/lib/python3.8/site-packages/snowflake/connector/options.py:78 UserWarning: You have an incompatible version of 'pyarrow' installed (2.0.0), please install a version that adheres to: 'pyarrow<0.18.0,>=0.17.0; extra == "pandas"'
   
   In [2]: t = S3ToSnowflakeOperator(stage="FOO.BAR.S3_STG", file_format="(TYPE=AVRO)", schema="FOO.BAR", table="WEATHER", task_id="tid")
   
   In [3]: t.execute(None)
   [2021-01-10 23:46:01,029] {connection.py:206} INFO - Snowflake Connector for Python Version: 2.3.6, Python Version: 3.8.5, Platform: Linux-5.8.0-36-generic-x86_64-with-glibc2.29
   [2021-01-10 23:46:01,030] {connection.py:743} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
   [2021-01-10 23:46:01,030] {connection.py:759} INFO - Setting use_openssl_only mode to False
   [2021-01-10 23:46:01,783] {cursor.py:530} INFO - query: [ALTER SESSION SET autocommit=True]
   [2021-01-10 23:46:01,927] {cursor.py:553} INFO - query execution done
   [2021-01-10 23:46:01,927] {dbapi.py:180} INFO - Running statement: 
   COPY INTO FOO.BAR.WEATHER
   FROM @FOO.BAR.S3_STG/
   
   file_format=(TYPE=AVRO)
   
   , parameters: None
   [2021-01-10 23:46:01,928] {cursor.py:530} INFO - query: [COPY INTO FOO.BAR.WEATHER FROM @FOO.BAR.S3_STG/  file_format=(TYPE=AVRO)]
   [2021-01-10 23:46:02,657] {cursor.py:553} INFO - query execution done
   [2021-01-10 23:46:02,658] {dbapi.py:186} INFO - Rows affected: 3
   [2021-01-10 23:46:02,659] {connection.py:430} INFO - closed
   ```
   
   and made sure 15 records were correctly uploaded via snowsql:
   
   ```
   sekikn#COMPUTE_WH@FOO.BAR>SELECT COUNT(*) FROM WEATHER;
   +----------+                                                                    
   | COUNT(*) |
   |----------|
   |       15 |
   +----------+
   1 Row(s) produced. Time Elapsed: 1.126s
   ```
   
   Then, ran S3ToSnowflakeOperator again **with** prefix:
   
   ```
   In [4]: t = S3ToSnowflakeOperator(stage="FOO.BAR.S3_STG", prefix="subdir", file_format="(TYPE=AVRO)", schema="FOO.BAR", table="WEATHER", task_id="tid")
   
   In [5]: t.execute(None)
   [2021-01-10 23:47:56,478] {connection.py:206} INFO - Snowflake Connector for Python Version: 2.3.6, Python Version: 3.8.5, Platform: Linux-5.8.0-36-generic-x86_64-with-glibc2.29
   [2021-01-10 23:47:56,478] {connection.py:743} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
   [2021-01-10 23:47:57,155] {cursor.py:530} INFO - query: [ALTER SESSION SET autocommit=True]
   [2021-01-10 23:47:57,290] {cursor.py:553} INFO - query execution done
   [2021-01-10 23:47:57,290] {dbapi.py:180} INFO - Running statement: 
   COPY INTO FOO.BAR.WEATHER
   FROM @FOO.BAR.S3_STG/subdir
   
   file_format=(TYPE=AVRO)
   
   , parameters: None
   [2021-01-10 23:47:57,291] {cursor.py:530} INFO - query: [COPY INTO FOO.BAR.WEATHER FROM @FOO.BAR.S3_STG/subdir  file_format=(TYPE=AVRO)]
   [2021-01-10 23:47:57,947] {cursor.py:553} INFO - query execution done
   [2021-01-10 23:47:57,948] {dbapi.py:186} INFO - Rows affected: 1
   [2021-01-10 23:47:57,948] {connection.py:430} INFO - closed
   ```
   
   and made sure only five records were additionally uploaded this time:
   
   ```
   sekikn#COMPUTE_WH@FOO.BAR>SELECT COUNT(*) FROM WEATHER;
   +----------+                                                                    
   | COUNT(*) |
   |----------|
   |       20 |
   +----------+
   1 Row(s) produced. Time Elapsed: 0.628s
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#discussion_r549979441



##########
File path: airflow/providers/snowflake/transfers/s3_to_snowflake.py
##########
@@ -71,16 +71,14 @@ def __init__(
     def execute(self, context: Any) -> None:
         snowflake_hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
 
-        # Snowflake won't accept list of files it has to be tuple only.
-        # but in python tuple([1]) = (1,) => which is invalid for snowflake
-        files = str(self.s3_keys)
-        files = files.replace('[', '(')
-        files = files.replace(']', ')')
+        files = ""
+        if self.s3_keys:
+            files = "files=({})".format(", ".join(f"'{key}'" for key in self.s3_keys))
 
         # we can extend this based on stage
         base_sql = """
                     FROM @{stage}/

Review comment:
       Yeah so in addition to loading every file in stage, another pattern supported is having one stage per bucket and specify prefix e.g. `FROM '@{stage}/my-prefix/my-subprefix/v1/`
   
   i think this PR would be a good opportunity to add support for that too.  perhaps it's as simple as adding a param `prefix` and making `from_path=f"{stage}/{prefix}" what do you think @sekikn?
   
   though if you want to call that out of scope i'll withdraw the suggestion.

##########
File path: tests/providers/snowflake/transfers/test_s3_to_snowflake.py
##########
@@ -26,6 +26,42 @@
 class TestS3ToSnowflakeTransfer(unittest.TestCase):
     @mock.patch("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run")
     def test_execute(self, mock_run):
+        table = 'table'
+        stage = 'stage'
+        file_format = 'file_format'
+        schema = 'schema'
+
+        S3ToSnowflakeOperator(
+            s3_keys=None,
+            table=table,
+            stage=stage,
+            file_format=file_format,
+            schema=schema,
+            columns_array=None,
+            task_id="task_id",
+            dag=None,
+        ).execute(None)
+
+        base_sql = """
+                FROM @{stage}/
+
+                file_format={file_format}
+            """.format(
+            stage=stage, file_format=file_format
+        )
+
+        copy_query = """
+                COPY INTO {schema}.{table} {base_sql}
+            """.format(
+            schema=schema, table=table, base_sql=base_sql
+        )
+        copy_query = "\n".join(line.strip() for line in copy_query.splitlines())
+
+        mock_run.assert_called_once()
+        assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
+
+    @mock.patch("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run")
+    def test_execute_with_s3_keys(self, mock_run):

Review comment:
       ```suggestion
   class TestS3ToSnowflakeTransfer(unittest.TestCase):
       @parameterized.expand([
           ('base', dict(table='table',
                         stage='stage',
                         file_format='file_format',
                         schema='schema',
                         ), "COPY INTO schema.table FROM @stage/ file_format=file_format"),
           ('files', dict(
               s3_keys=['1.csv', '2.csv'],
               table='table',
               stage='stage',
               file_format='file_format',
               schema='schema',
           ), "COPY INTO schema.table FROM @stage/ files=('1.csv', '2.csv') file_format=file_format"),
           ('columns', dict(
               table='table',
               stage='stage',
               file_format='file_format',
               schema='schema',
               columns_array=['col1', 'col2', 'col3'],
           ), "COPY INTO schema.table(col1,col2,col3) FROM @stage/ file_format=file_format"),
       ])
       @mock.patch("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run")
       def test_execute(self, name, kwargs, expected, mock_run):
           S3ToSnowflakeOperator(
               **kwargs,
               task_id="task_id",
               dag=None,
           ).execute(None)
   
           mock_run.assert_called_once()
           assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], expected)
   ```
   entire test can be replaced with this
   using parameterization makes it much easier to read IMO




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761838898


   @potiuk just thing to keep in mind I'm this topic of transfer operators.  There is no s3 hook involved here. This is a pure snowflake operator.  It's just executing a snowflake command.  So in that sense, it's not properly a transfer operator.  Yes snowflake is accessing storage but the operator knows nothing about this. (Snowflake manages this)  So here's no reason to artificially restrict it's focus to s3 in the naming.
   
   To be clear I'm not saying this particular or shouldn't go through.  It's a good improvement. I'm just making a case for fixing the naming, if not now then later.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761838898


   @potiuk just thing to keep in mind I'm this topic of transfer operators.  There is no s3 hook involved there. This is a pure snowflake operator.  It's just executing a snowflake command.  So in that sense, to me it's not properly a transfer operator.  Yes snowflake is accessing storage but the operator knows nothing about this.  And there's no reason to artificially restrict it's focus to s3 in the naming. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-762086664


   @dstandish  Actually I have no problem with those "generalization" being regular (non-transfer) operators. This is perfectly fine and we already have such operators. 
   
   Look here: https://github.com/apache/airflow/blob/master/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py
   
   This is the "embodiment" of what I was talking about. We have `TransferJobPreprocessor` class and `CloudDataTransferServiceHoook` and they serve similar purpose - bring data from "somewhere" to `GCS`. This "Processor" and "Hook" classes are used by composition in few operators:
   
   * CloudDataTransferServiceGCSToGCSOperator
   * CloudDataTransferServiceS3ToGCSOperator
   
   What those operators do, they merely pass the credentials and fire the job that transfer the data.
   
   But it is a bit more complex because it is an 'asynchronous' operator - you can fire the job and monitor it's output/cancel if needed. It would be great if we actually also have similar approach in the "generalization" of the "Load" operator possibly (not necessarily, but would be nice). 
   
   As i understand it now - we are not actually "running the transfer ourselves" and we are not even passing the AWS credentials to snowflake. Am I correct? What got me conused is that in the SnowflakeHook there is this method:
   
   ```
       def _get_aws_credentials(self) -> Tuple[Optional[Any], Optional[Any]]:
           """
           Returns aws_access_key_id, aws_secret_access_key
           from extra
   
           intended to be used by external import and export statements
           """
   ```
   Which is - apparently - not used and not needed - it retrieves AWS credentials from extras of the Snowflake(!) connection it seems - which is rather cumbersome. So I'd rather fix it as well, remove the method and replace it with the actual Airflow AWS connection if we needed it at all. But I understand from your explanation that this LOAD method simply takes the file ids of whatever cloud storage we provide, and Snowflake already has to have the right credentials configured to be able to read those.
   
   Am I right?  If so, this is not a "Transfer" operator at all.  It's a regular Operator that fires external job and waits for it :).
   
   The `SnowflakeLoadOperator` is a good name BTW. !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sekikn commented on a change in pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
sekikn commented on a change in pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#discussion_r529125852



##########
File path: tests/providers/snowflake/transfers/test_s3_to_snowflake.py
##########
@@ -26,6 +26,41 @@
 class TestS3ToSnowflakeTransfer(unittest.TestCase):
     @mock.patch("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run")
     def test_execute(self, mock_run):
+        table = 'table'
+        stage = 'stage'
+        file_format = 'file_format'
+        schema = 'schema'
+
+        S3ToSnowflakeOperator(
+            s3_keys=None,
+            table=table,
+            stage=stage,
+            file_format=file_format,
+            schema=schema,
+            columns_array=None,
+            task_id="task_id",
+            dag=None,
+        ).execute(None)
+
+        base_sql = """
+                FROM @{stage}/
+                
+                file_format={file_format}
+            """.format(
+            stage=stage, file_format=file_format
+        )
+
+        copy_query = """
+                COPY INTO {schema}.{table} {base_sql}
+            """.format(
+            schema=schema, table=table, base_sql=base_sql
+        )
+
+        assert mock_run.call_count == 1

Review comment:
       Thank you for the comment, @michalslowikowski00! Updated the PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761814656


   @dstandish  I think for now we can merge it as it is. We are going to release the snowflake provider soon, so I think having just s3 for now is ok - we can always extract the "Copy" "common code"  later but I also have a feeling that S3ToSnowflake/GCSToSnowflake etc. should remain as separate operators (at the very least the init parameters will be different for those ones). We have a generic "transfer" operator in the works to handle "any - to  - any" transfer mechanism proposed and there is alredy a design doc in place for this one:
   
   https://lists.apache.org/thread.html/rc888a329f1c49622c0123c2ddbcfcc107eead020b774f8a8fab6d7f1%40%3Cdev.airflow.apache.org%3E  
   
   This should allow to implement any "storage" and "sql" transfer  by just providing the right hook and this one is the "ultimate" generalisation, but even that generalisation assumes that we will continue to have specialized "XtoY" operators in many cases because there are some optimisations or specific requirements or capabilities of those "specialized" solutions that allow for some kind of optimalisations (for example parallel transfer of many files) or custom behaviors (generating bigquery schema for ToGCS transfers for example). 
   
   I think it's best to add the s3 -> snowflake fix and think about generalizations independently.
   
   @kurtqq @sekikn  -> can you please rebase that one to the latest master? The "snowflake" finally released the version of python connector that does not break other providers and I just merged (on Friday) the change that incorporates it and fixes some other dependencies, so it's worth testing if it works. I plan to release a set of providers soon and I would love this one to be merged! 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-760392600


   one observation, which we might not want to deal with at this time but i'll make it anyway..
   
   snowflake supports azure  / s3 / gcs
   
   this operator assumes a stage and not any particular storage backend, so i am pretty sure it works identically with gcp and azure.
   
   so it might be we should deprecate this and rename it not as a transfer operator but a SnowflakeCopyOperator
   
   additionally, i think it should eventually be updated to parse the results and fail as appropriate, which could depend on whether there are skipped or failed files and should be configurable.  but i'm sure you'll want to consider this out of scope on this change.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761863984


   What if a full _100%_ of the code is the same?
   
   I believe 100% of the code can and should be the same, and anything else is unnecessary complexity.
   
   We simply don't need to make _any_ assumptions about storage.  See here: https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
   
   It's like having a MySqlINNODBOperator and a MySqlMyISAMOperator; either way you're just executing sql.
   
   Or perhaps MySqlINNODBToMySIAMTransferOperator -- an artifical and unproductive distinction.
   
   At least that's _my_ take :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-762389075


   Crystall clear @dstandish ! I am merging it now and the refactor to "Load" operator can be done as next step!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761838898


   @potiuk just thing to keep in mind I'm this topic of transfer operators.  There is no s3 hook involved here. This is a pure snowflake operator.  It's just executing a snowflake command.  So in that sense, it's not properly a transfer operator.  Yes snowflake is accessing storage but the operator knows nothing about this. (Snowflake manages this)  And there's no reason to artificially restrict it's focus to s3 in the naming. 
   
   To be clear I'm not saying this particular or shouldn't go through.  It's a good improvement. I'm just making a case for fixing the naming, if not now then later.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761890384


   > It's like having a MySqlINNODBOperator and a MySqlMyISAMOperator; either way you're just executing sql.
   
   If this is close to 100% of code - that's cool.
   
   How about backwards compatibility then ? We have the rule that we do not remove stuff (semver versioning for providers is clear about it). 
   
   We have an existing S3ToSnowflake operator with s3_keys as parameter. 
   
   From what I see you propose is to have AnyStorageToSnowflake operator. 
   
   How about adding (as next step) such an operator with keeping the S3ToSnowflake as a proxy to this one (converting s3_keys to 'storage_keys' - but throwing depracation warning). Then you could  system-testing the other cases (Google/Azure) and add the general operator next to the existing one. The 'extra' code will be just mapping init parameters. and in 2.0.0 version of snowflake provider we could drop it.
   
   How about that?
   
   There is a big value in keeping backwards-compatibility. We've done that for some 500 operators when we moved them from "contrib" to "providers". This is not really a 'waste' - this is thinking about your users.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #12505:
URL: https://github.com/apache/airflow/pull/12505


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761849270


   > @potiuk just thing to keep in mind I'm this topic of transfer operators. There is no s3 hook involved here. This is a pure snowflake operator. It's just executing a snowflake command. So in that sense, it's not properly a transfer operator. Yes snowflake is accessing storage but the operator knows nothing about this. (Snowflake manages this) So here's no reason to artificially restrict it's focus to s3 in the naming.
   > 
   > To be clear I'm not saying this particular or shouldn't go through. It's a good improvement. I'm just making a case for fixing the naming, if not now then later.
   
   Surely I understand that, but my line of thinking does not change here. It does not matter if this is a 'proper' transfer operator and whether a hook is used or not. I simply argue that for the user, having a separate "S3ToSnowflake", "GCSToSnowflake", "AzureBlobToSnowflake" is perfectly fine, and even intended. 
   
   Even if they will eventually re-use a lot of the same code, those can be implemented as 3 separate operators, with a common code reuse (either inheritance or composition). As a user, I'd very much prefer to have separate S3ToSnowlake with S3-only parameters, GCSToSnowflake with Google ones and  AzureBlobToSnowflake with only azure ones.
   
   Generally speaking - even if eventually 90% code under the hood will be the same - it can be extracted as next step when GCS/Azure operators will be added - without breaking backwards compatibility of the current S3ToSnowlake operator. The fact that snowflake allows to interact with all the three storages allows doing that and sharing the code (again - later). This way at this stage we save on system tests - the person who implements the S3 one and having credentials etc, would not have to create also Azure and GCS accounts to test if everything works fine for those. This can be done by next person who will have the S3 unit tests to test if the basic S3 assumptions are not broken, and can add GCS on top and test it thoroughly then. 
   
   I do not know exactly if there are some differences on how the GCS/S3 export works in this snowflake API, but I'd assume that you can still control if the BQ schema is created for example (this is typical use case for exporting to GCS). And I think having a "thin" layer of S3ToS , GCSToS .. over the native Snowflake capability is much closer to the philosophy of many current operators of Airflow. 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761849270


   > @potiuk just thing to keep in mind I'm this topic of transfer operators. There is no s3 hook involved here. This is a pure snowflake operator. It's just executing a snowflake command. So in that sense, it's not properly a transfer operator. Yes snowflake is accessing storage but the operator knows nothing about this. (Snowflake manages this) So here's no reason to artificially restrict it's focus to s3 in the naming.
   > 
   > To be clear I'm not saying this particular or shouldn't go through. It's a good improvement. I'm just making a case for fixing the naming, if not now then later.
   
   Surely I understand that, but my line of thinking does not change here. It does not matter if this is a 'proper' transfer operator and whether a hook is used or not. I simply argue that for the user, having a separate "S3ToSnowflake", "GCSToSnowflake", "AzureBlobToSnowflake" is perfectly fine, and even intended. 
   
   Even if they will eventually re-use a lot of the same code, those can be implemented as 3 separate operators, with a common code reuse (either inheritance or composition). As a user, I'd very much prefer to have separate S3ToSnowlake with S3-only parameters for authentication, GCSToSnowflake with Google one and  AzureBlobToSnowflake with only azure authentication.
   
   Generally speaking - even if eventually 90% code under the hood will be the same - it can be extracted as next step when GCS/Azure operators will be added - without breaking backwards compatibility of the current S3ToSnowlake operator. The fact that snowflake allows to interact with all the three storages allows doing that and sharing the code (again - later). This way at this stage we save on system tests - the person who implements the S3 one and having credentials etc, would not have to create also Azure and GCS accounts to test if everything works fine for those. This can be done by next person who will have the S3 unit tests to test if the basic S3 assumptions are not broken, and can add GCS on top and test it thoroughly then. 
   
   I do not know exactly if there are some differences on how the GCS/S3 export works in this snowflake API, but I'd assume that you can still control if the BQ schema is created for example (this is typical use case for exporting to GCS). And I think having a "thin" layer of S3ToS , GCSToS .. over the native Snowflake capability is much closer to the philosophy of many current operators of Airflow. 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-762389245


   Thanks @sekikn !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sekikn commented on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
sekikn commented on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761999221


   Regarding this PR, I'll just rebase it soon so that Jarek can merge in.
   I agree with renaming this as an operator in another PR, with keeping the `S3ToSnowflakeOperator` as a proxy to provide backward compatibility to users, at least for a while.
   I'd prefer `SnowflakeLoadOperator` (and `SnowflakeUnloadOperator` for the reverse operation) as a new name for its conciseness. "Load/unload" are often used for the operation in question in the official Snowflake document also.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-762337192


   > As i understand it now - we are not actually "running the transfer ourselves" and we are not even passing the AWS credentials to snowflake. 
   
   This is correct.   A standard pattern is you define a "stage" in snowflake which is simply a pointer to a bucket.    It can use GCS / Azure / s3.  When you do this you don't need to suppy creds in sql commands, and the sql is the same no matter the underlying storage.   
   
   When loading you supply stage + prefix and it will find your files.  No need to specify specific files (snowlake will actually keep track of what it has loaded before).
   
   Passing a path and credentials (i.e. instead of creating a stage) is supported but then your have your s3 creds sitting in logs so i think not a great idea.  And this operator assumes a stage anyway. 
   
   > What got me conused is that in the SnowflakeHook there is this method `_get_aws_credentials`
   
   Yes I think this is not needed at all and not used.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-731474813


   [The Workflow run](https://github.com/apache/airflow/actions/runs/375416447) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761838898


   @potiuk just thing to keep in mind I'm this topic of transfer operators.  There is no s3 hook involved there. This is a pure snowflake operator.  It's just executing a snowflake command.  So in that sense, to me it's not properly a transfer operator.  Yes snowflake is accessing storage but the operator knows nothing about this. (Snowflake managed this)  And there's no reason to artificially restrict it's focus to s3 in the naming. 
   
   Though to be clear I'm not saying this particular or shouldn't go through. I'm just making a case for fixing the naming.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-762086664


   @dstandish  Actually I have no problem with those "generalization" being regular (non-transfer) operators. This is perfectly fine and we already have such operators. 
   
   Look here: https://github.com/apache/airflow/blob/master/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py
   
   This is the "embodiment" of what I was talking about. We have `TransferJobPreprocessor` class and `CloudDataTransferServiceHoook` and they serve similar purpose - bring data from "somewhere" to `GCS`. This "Processor" and "Hook" classes are used by composition in few operators:
   
   * CloudDataTransferServiceGCSToGCSOperator
   * CloudDataTransferServiceS3ToGCSOperator
   
   What those operators do, they merely pass the credentials and fire the job that transfer the data.
   
   But it is a bit more complex because it is an 'asynchronous' operator - you can fire the job and monitor it's output/cancel if needed. It would be great if we actually also have similar approach in the "generalization" of the "Load" operator possibly (not necessarily, but would be nice). 
   
   As i understand it now - we are not actually "running the transfer ourselves" and we are not even passing the AWS credentials to snowflake. Am I correct? What got me conused is that in the SnowflakeHook there is this method:
   
   ```
       def _get_aws_credentials(self) -> Tuple[Optional[Any], Optional[Any]]:
           """
           Returns aws_access_key_id, aws_secret_access_key
           from extra
   
           intended to be used by external import and export statements
           """
   ```
   Which is - apparently not used and not needed - it retrieves AWS credentials from extras of the Snowflake(!) connection it seems - which is rather cumbersome. So I'd rather fix it as well, remove the method and replace it with the actual Airflow AWS connection if we needed it at all. But I understand from your explanation that this LOAD method simply takes the file ids of whatever cloud storage we provide, and Snowflake already has to have the right credentials configured to be able to read those.
   
   Am I right?  If so, this is not a "Transfer" operator at all.  It's a regular Operator that fires external job and waits for it :).
   
   The `SnowflakeLoadOperator` is a good name BTW. !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761925481


   I would propose calling it `SnowflakeCopyOperator`.  And re keys, we can stick with the naming in snowflake which is `files`.
   
   And yes this sounds great, stand up the new operator alongside the s3 one and refactor s3 one to use the new one and deprecate s3 one.  I don't know if I'll have time to contribute but I may try to squeeze it in.  I have worked out some good logic for parsing load results before.
   
   Re naming, I know you are partial to `Transfer` or `To` but my view is, in this context it's not best thought of as a transfer.  It's just a snowflake operation.  It's an operator for running copy commands.  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#discussion_r529280175



##########
File path: tests/providers/snowflake/transfers/test_s3_to_snowflake.py
##########
@@ -26,6 +26,41 @@
 class TestS3ToSnowflakeTransfer(unittest.TestCase):
     @mock.patch("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run")
     def test_execute(self, mock_run):
+        table = 'table'
+        stage = 'stage'
+        file_format = 'file_format'
+        schema = 'schema'
+
+        S3ToSnowflakeOperator(
+            s3_keys=None,
+            table=table,
+            stage=stage,
+            file_format=file_format,
+            schema=schema,
+            columns_array=None,
+            task_id="task_id",
+            dag=None,
+        ).execute(None)
+
+        base_sql = """
+                FROM @{stage}/
+                
+                file_format={file_format}
+            """.format(
+            stage=stage, file_format=file_format
+        )
+
+        copy_query = """
+                COPY INTO {schema}.{table} {base_sql}
+            """.format(
+            schema=schema, table=table, base_sql=base_sql
+        )
+
+        assert mock_run.call_count == 1

Review comment:
       :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-760392600


   one observation, which we might not want to deal with at this time but i'll make it anyway..
   
   snowflake supports azure  / s3 / gcs
   
   this operator assumes a stage and not any particular storage backend, so i am pretty sure it works identically with gcp and azure.
   
   so it might be we should deprecate this and rename it not as a transfer operator but a SnowflakeCopyOperator
   
   we'd just need to rename s3_keys to files
   
   additionally, i think it should eventually be updated to parse the load results and fail as appropriate, which could depend on whether there are skipped or failed files and should be configurable.  but i'm sure you'll want to consider this out of scope on this change.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#discussion_r528560820



##########
File path: tests/providers/snowflake/transfers/test_s3_to_snowflake.py
##########
@@ -26,6 +26,41 @@
 class TestS3ToSnowflakeTransfer(unittest.TestCase):
     @mock.patch("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run")
     def test_execute(self, mock_run):
+        table = 'table'
+        stage = 'stage'
+        file_format = 'file_format'
+        schema = 'schema'
+
+        S3ToSnowflakeOperator(
+            s3_keys=None,
+            table=table,
+            stage=stage,
+            file_format=file_format,
+            schema=schema,
+            columns_array=None,
+            task_id="task_id",
+            dag=None,
+        ).execute(None)
+
+        base_sql = """
+                FROM @{stage}/
+                
+                file_format={file_format}
+            """.format(
+            stage=stage, file_format=file_format
+        )
+
+        copy_query = """
+                COPY INTO {schema}.{table} {base_sql}
+            """.format(
+            schema=schema, table=table, base_sql=base_sql
+        )
+
+        assert mock_run.call_count == 1

Review comment:
       You can also use `mock_run.assert_called_once()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761925481


   I would propose calling it `SnowflakeCopyOperator`.  And re keys, I would stick with the naming in snowflake which is `files`.
   
   And yes this sounds great, stand up the new operator alongside the s3 one and refactor s3 one to use the new one and deprecate s3 one.  I don't know if I'll have time to contribute but I may try to squeeze it in.  I have worked out some good logic for parsing load results before.
   
   Re naming, I know you are partial to `Transfer` or `To` but my view is, in this context it's not best thought of as a transfer.  It's just a snowflake operation.  It's an operator for running copy commands.  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761863984


   What if a full _100%_ of the code is the same?
   
   I believe 100% of the code can and should be the same, and anything else is unnecessary complexity.
   
   We simply don't need to make _any_ assumptions about storage.  See here: https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
   
   It's like having a MySqlINNODBOperator and a MySqlMyISAMOperator; either way you're just executing sql.
   
   Or perhaps MySqlINNODBToMySIAMOperator -- an artifical and unproductive distinction.
   
   At least that's _my_ take :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761925809


   Maybe better to call it copy into table operator since there is reverse, copy into location...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kurtqq commented on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
kurtqq commented on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-759784953


   tested it and it seems to work fine.
   I hope it will be merged soon
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761925809


   Maybe better to call it `SnowflakeCopyIntoTableOperator` since there is also the reverse copy operation -- copy into location...  Or perhaps `SnowflakeLoadOperator` 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sekikn commented on a change in pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
sekikn commented on a change in pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#discussion_r554579390



##########
File path: tests/providers/snowflake/transfers/test_s3_to_snowflake.py
##########
@@ -26,6 +26,42 @@
 class TestS3ToSnowflakeTransfer(unittest.TestCase):
     @mock.patch("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run")
     def test_execute(self, mock_run):
+        table = 'table'
+        stage = 'stage'
+        file_format = 'file_format'
+        schema = 'schema'
+
+        S3ToSnowflakeOperator(
+            s3_keys=None,
+            table=table,
+            stage=stage,
+            file_format=file_format,
+            schema=schema,
+            columns_array=None,
+            task_id="task_id",
+            dag=None,
+        ).execute(None)
+
+        base_sql = """
+                FROM @{stage}/
+
+                file_format={file_format}
+            """.format(
+            stage=stage, file_format=file_format
+        )
+
+        copy_query = """
+                COPY INTO {schema}.{table} {base_sql}
+            """.format(
+            schema=schema, table=table, base_sql=base_sql
+        )
+        copy_query = "\n".join(line.strip() for line in copy_query.splitlines())
+
+        mock_run.assert_called_once()
+        assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], copy_query)
+
+    @mock.patch("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run")
+    def test_execute_with_s3_keys(self, mock_run):

Review comment:
       Agreed, update the test case to be parametrized.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#issuecomment-761838898


   @potiuk just thing to keep in mind I'm this topic of transfer operators.  There is no s3 hook involved there. This is a pure snowflake operator.  It's just executing a snowflake command.  So in that sense, to me it's not properly a transfer operator.  Yes snowflake is accessing storage but the operator knows nothing about this.  And there's no reason to artificially restrict it's focus to s3 in the naming. 
   
   Though to be clear I'm not saying this particular or shouldn't go through. I'm just making a case for fixing the naming.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org