You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/10 14:58:43 UTC

[GitHub] [airflow] sekikn commented on a change in pull request #12505: Fix S3ToSnowflakeOperator to support uploading all files in the specified stage

sekikn commented on a change in pull request #12505:
URL: https://github.com/apache/airflow/pull/12505#discussion_r554579260



##########
File path: airflow/providers/snowflake/transfers/s3_to_snowflake.py
##########
@@ -71,16 +71,14 @@ def __init__(
     def execute(self, context: Any) -> None:
         snowflake_hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
 
-        # Snowflake won't accept list of files it has to be tuple only.
-        # but in python tuple([1]) = (1,) => which is invalid for snowflake
-        files = str(self.s3_keys)
-        files = files.replace('[', '(')
-        files = files.replace(']', ')')
+        files = ""
+        if self.s3_keys:
+            files = "files=({})".format(", ".join(f"'{key}'" for key in self.s3_keys))
 
         # we can extend this based on stage
         base_sql = """
                     FROM @{stage}/

Review comment:
       Thanks! My first thought was that users can specify prefix within the "stage" variable, but your design is cleaner actually. Just added it.
   I manually confirmed that parameter worked, as follows:
   
   Given an example S3 bucket with the following structure (each file contains five records),
   
   ```
   $ aws s3 ls --recursive s3://****/input
   2020-11-20 10:40:25          0 input/
   2021-01-10 23:04:49          0 input/subdir/
   2021-01-10 23:39:35        335 input/subdir/weather-sorted.avro
   2020-11-20 11:42:15        330 input/weather-snappy.avro
   2020-11-20 10:40:50        358 input/weather.avro
   ```
   
   and the following stage and target table:
   
   ```
   sekikn#COMPUTE_WH@FOO.BAR>DESC STAGE S3_STG;
   +--------------------+---------------------+---------------+------------------------------------------------+------------------+
   | parent_property    | property            | property_type | property_value                                 | property_default |
   |--------------------+---------------------+---------------+------------------------------------------------+------------------|
   
   (snip)
   
   | STAGE_COPY_OPTIONS | FORCE               | Boolean       | true                                           | false            |
   | STAGE_LOCATION     | URL                 | String        | ["s3://****/input/"]                           |                  |
   | STAGE_INTEGRATION  | STORAGE_INTEGRATION | String        | S3_INT                                         |                  |
   
   (snip)
   
   sekikn#COMPUTE_WH@FOO.BAR>DESC TABLE WEATHER;
   +------+---------+--------+-------+---------+-------------+------------+-------+------------+---------+
   | name | type    | kind   | null? | default | primary key | unique key | check | expression | comment |
   |------+---------+--------+-------+---------+-------------+------------+-------+------------+---------|
   | C    | VARIANT | COLUMN | Y     | NULL    | N           | N          | NULL  | NULL       | NULL    |
   +------+---------+--------+-------+---------+-------------+------------+-------+------------+---------+
   1 Row(s) produced. Time Elapsed: 0.568s
   sekikn#COMPUTE_WH@FOO.BAR>SELECT COUNT(*) FROM WEATHER;
   +----------+                                                                    
   | COUNT(*) |
   |----------|
   |        0 |
   +----------+
   1 Row(s) produced. Time Elapsed: 0.575s
   ```
   
   First, ran S3ToSnowflakeOperator without prefix,
   
   ```
   In [1]: from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeOperator
   /home/sekikn/venv/a/lib/python3.8/site-packages/snowflake/connector/options.py:78 UserWarning: You have an incompatible version of 'pyarrow' installed (2.0.0), please install a version that adheres to: 'pyarrow<0.18.0,>=0.17.0; extra == "pandas"'
   
   In [2]: t = S3ToSnowflakeOperator(stage="FOO.BAR.S3_STG", file_format="(TYPE=AVRO)", schema="FOO.BAR", table="WEATHER", task_id="tid")
   
   In [3]: t.execute(None)
   [2021-01-10 23:46:01,029] {connection.py:206} INFO - Snowflake Connector for Python Version: 2.3.6, Python Version: 3.8.5, Platform: Linux-5.8.0-36-generic-x86_64-with-glibc2.29
   [2021-01-10 23:46:01,030] {connection.py:743} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
   [2021-01-10 23:46:01,030] {connection.py:759} INFO - Setting use_openssl_only mode to False
   [2021-01-10 23:46:01,783] {cursor.py:530} INFO - query: [ALTER SESSION SET autocommit=True]
   [2021-01-10 23:46:01,927] {cursor.py:553} INFO - query execution done
   [2021-01-10 23:46:01,927] {dbapi.py:180} INFO - Running statement: 
   COPY INTO FOO.BAR.WEATHER
   FROM @FOO.BAR.S3_STG/
   
   file_format=(TYPE=AVRO)
   
   , parameters: None
   [2021-01-10 23:46:01,928] {cursor.py:530} INFO - query: [COPY INTO FOO.BAR.WEATHER FROM @FOO.BAR.S3_STG/  file_format=(TYPE=AVRO)]
   [2021-01-10 23:46:02,657] {cursor.py:553} INFO - query execution done
   [2021-01-10 23:46:02,658] {dbapi.py:186} INFO - Rows affected: 3
   [2021-01-10 23:46:02,659] {connection.py:430} INFO - closed
   ```
   
   and made sure 15 records were correctly uploaded via snowsql:
   
   ```
   sekikn#COMPUTE_WH@FOO.BAR>SELECT COUNT(*) FROM WEATHER;
   +----------+                                                                    
   | COUNT(*) |
   |----------|
   |       15 |
   +----------+
   1 Row(s) produced. Time Elapsed: 1.126s
   ```
   
   Then, ran S3ToSnowflakeOperator again **with** prefix:
   
   ```
   In [4]: t = S3ToSnowflakeOperator(stage="FOO.BAR.S3_STG", prefix="subdir", file_format="(TYPE=AVRO)", schema="FOO.BAR", table="WEATHER", task_id="tid")
   
   In [5]: t.execute(None)
   [2021-01-10 23:47:56,478] {connection.py:206} INFO - Snowflake Connector for Python Version: 2.3.6, Python Version: 3.8.5, Platform: Linux-5.8.0-36-generic-x86_64-with-glibc2.29
   [2021-01-10 23:47:56,478] {connection.py:743} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
   [2021-01-10 23:47:57,155] {cursor.py:530} INFO - query: [ALTER SESSION SET autocommit=True]
   [2021-01-10 23:47:57,290] {cursor.py:553} INFO - query execution done
   [2021-01-10 23:47:57,290] {dbapi.py:180} INFO - Running statement: 
   COPY INTO FOO.BAR.WEATHER
   FROM @FOO.BAR.S3_STG/subdir
   
   file_format=(TYPE=AVRO)
   
   , parameters: None
   [2021-01-10 23:47:57,291] {cursor.py:530} INFO - query: [COPY INTO FOO.BAR.WEATHER FROM @FOO.BAR.S3_STG/subdir  file_format=(TYPE=AVRO)]
   [2021-01-10 23:47:57,947] {cursor.py:553} INFO - query execution done
   [2021-01-10 23:47:57,948] {dbapi.py:186} INFO - Rows affected: 1
   [2021-01-10 23:47:57,948] {connection.py:430} INFO - closed
   ```
   
   and made sure only five records were additionally uploaded this time:
   
   ```
   sekikn#COMPUTE_WH@FOO.BAR>SELECT COUNT(*) FROM WEATHER;
   +----------+                                                                    
   | COUNT(*) |
   |----------|
   |       20 |
   +----------+
   1 Row(s) produced. Time Elapsed: 0.628s
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org