You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/02/24 10:56:09 UTC

[GitHub] [airflow] JavierLopezT opened a new pull request #14415: SnowflakeToS3Operator

JavierLopezT opened a new pull request #14415:
URL: https://github.com/apache/airflow/pull/14415


   @feluelle @turbaszek I closed the other PR because I messed it with the rebase


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#issuecomment-876674001


   > @dstandish - does it address your concerns :) ?
   
   I think this should be storage backend agnostic.  e.g. SnowflakeCopyIntoLocationOperator
   
   With the same amount of effort to build and maintain the present operator we could instead support any storage backend.  Just requires some modest changes to make it more generic.
   
   I suggested this before but @JavierLopezT does not appear to agree 🤷‍♂️


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#issuecomment-894094479


   > My belief is that is easier to import SnowflakeToS3 from amazon.aws.transfers, as all the operators that upload data to S3, than from snowflake.transfers.
   
   I think the easiest thing is having exactly _one_ snowflake copy into location operator and exactly _one_ place from which to import it.
   
   And I'd say it's not really transfer.  It's just executing sql.  It's purely an interaction with snowflake, and snowflake is handling the "transferring" internally.  E.g. we're not pulling the rows out through connector into airflow and writing them out and uploading to s3 -- _that_ would be a transfer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#issuecomment-894094479


   > My belief is that is easier to import SnowflakeToS3 from amazon.aws.transfers, as all the operators that upload data to S3, than from snowflake.transfers.
   
   I think the easiest thing is having exactly _one_ snowflake copy into location operator and exactly _one_ place from which to import it.
   
   I wouldn't say it's impossible that there could be utility in having a subclass that is storage-specific.  But I think the first thing is producing the generic version, and then we'll be in a better place to evaluate whether the subclass is worth its weight.
   
   And I'd say it's not really transfer.  It's just executing sql.  It's purely an interaction with snowflake, and snowflake is handling the "transferring" internally.  E.g. we're not pulling the rows out through connector into airflow and writing them out and uploading to s3 -- _that_ would be a transfer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r583737533



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,
+        session_parameters: Optional[dict] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.stage = stage
+        self.s3_bucket = s3_bucket
+        self.s3_key = s3_key
+        self.file_format = file_format
+        self.warehouse = warehouse
+        self.database = database
+        self.snowflake_conn_id = snowflake_conn_id
+        self.query_or_table = query_or_table
+        self.unload_options = unload_options or []  # type: List
+        self.autocommit = autocommit
+        self.include_header = include_header
+        self.role = role
+        self.authenticator = authenticator
+        self.session_parameters = session_parameters
+
+    def execute(self, context):
+        snowflake_hook = SnowflakeHook(
+            snowflake_conn_id=self.snowflake_conn_id,
+            warehouse=self.warehouse,
+            database=self.database,
+            role=self.role,
+            authenticator=self.authenticator,
+            session_parameters=self.session_parameters,
+        )
+        unload_options = '\n\t'.join(self.unload_options)
+
+        if self.stage:
+            unload_query = f"""
+            COPY INTO @{self.stage}/{self.s3_key}
+            FROM ({self.query_or_table})
+            """
+
+            if self.file_format:
+                unload_query += f'FILE_FORMAT = ({self.file_format})'
+
+        else:
+            unload_query = f"""
+            COPY INTO 's3://{self.s3_bucket}/{self.s3_key}'
+            FROM ({self.query_or_table})
+            STORAGE_INTEGRATION = S3
+            FILE_FORMAT = ({self.file_format})
+            """
+
+        unload_query += f"""{unload_options}
+                            HEADER = {self.include_header};"""
+
+        self.log.info('Executing UNLOAD command...')
+        snowflake_hook.run(unload_query, self.autocommit)

Review comment:
       i recommend using cur.execute instead so you can run cur.fetchall afterwards and print to logs the output 
   
   snowflake tells you the files it created in the unload, and this is useful logging information.
   
   same as i suggested with the snowflake load scenario




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r583735445



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],

Review comment:
       I would suggest making file format classes.  They can get pretty involved and it's nice to make it easier to use for users.  
   
   E.g. csv file format needs headers.  json might have other config options.  these can be init params to format class.  these classes could potentially be reused in both load and unload operations.  base format class defines the attributes made available to the operator when building out the sql etc.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#issuecomment-894094479


   > My belief is that is easier to import SnowflakeToS3 from amazon.aws.transfers, as all the operators that upload data to S3, than from snowflake.transfers.
   
   I think the easiest thing is having exactly _one_ snowflake copy into location operator and exactly _one_ place from which to import it.
   
   I wouldn't say it's impossible that there could be utility in having a subclass that is storage-specific.  But I think the first thing is producing the generic version, and then we'll be in a better place to evaluate whether the subclass is worth its weight.
   
   And I'd say it's not really transfer.  It's just executing sql.  It's purely an interaction with snowflake, and snowflake is handling the "transferring" internally.  E.g. we're not pulling the rows out through connector into airflow and writing them out and uploading to s3 -- _that_ would be more of a transfer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r582220072



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)

Review comment:
       ```suggestion
       template_ext = ('.sql',)
       template_fields_renderers = {"query_or_table": "sql"}
   ```
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#issuecomment-785232561


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest master or amend the last commit of the PR, and push it with --force-with-lease.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] JavierLopezT commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
JavierLopezT commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r591757167



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],

Review comment:
       I don't see the advantages of creating a class instead of having a long (and ugly) string or a file_format created in Snowflake UI. Could you elaborate, please? 
   
   Anyway, I would leave that for another PR for not eternizing this. 

##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):

Review comment:
       I would rather keep this as a specific Operator for now. I have never used neither Azure nor Google Cloud so I am not able to doc the operator and so. It would be nice if someone in the future takes this operator to make the more generic version though

##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,
+        session_parameters: Optional[dict] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.stage = stage
+        self.s3_bucket = s3_bucket
+        self.s3_key = s3_key
+        self.file_format = file_format
+        self.warehouse = warehouse
+        self.database = database
+        self.snowflake_conn_id = snowflake_conn_id
+        self.query_or_table = query_or_table
+        self.unload_options = unload_options or []  # type: List
+        self.autocommit = autocommit
+        self.include_header = include_header
+        self.role = role
+        self.authenticator = authenticator
+        self.session_parameters = session_parameters
+
+    def execute(self, context):
+        snowflake_hook = SnowflakeHook(
+            snowflake_conn_id=self.snowflake_conn_id,
+            warehouse=self.warehouse,
+            database=self.database,
+            role=self.role,
+            authenticator=self.authenticator,
+            session_parameters=self.session_parameters,
+        )
+        unload_options = '\n\t'.join(self.unload_options)
+
+        if self.stage:
+            unload_query = f"""
+            COPY INTO @{self.stage}/{self.s3_key}
+            FROM ({self.query_or_table})
+            """

Review comment:
       I don't think so

##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake

Review comment:
       I agree with path or prefix. I just named it key because it was the name as well in RedshiftToS3

##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,

Review comment:
       I don't have a strong opinion on that. 
   
   I will remove autocommit though

##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,

Review comment:
       And if there is no such class? Shall I make it optional or completely remove it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] JavierLopezT edited a comment on pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
JavierLopezT edited a comment on pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#issuecomment-894080303


   Hello @potiuk and @dstandish. I guess that I can try to create a generic operator. However, I think it would be nice to still maintain the specific operators as a derived class from the generic one for importing. My belief is that is easier to import SnowflakeToS3 from amazon.aws.transfers, as all the operators that upload data to S3, than from snowflake.transfers. Also, in that way you can implement small specificities without repeating code, like:
   ```
   COPY INTO 's3://{self.s3_bucket}/{self.s3_path}'
   FROM {self.sql}
   STORAGE_INTEGRATION = S3
   ```
   
   What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r583713306



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,

Review comment:
       i think it might help readability (and be more future-proof) if we consolidate hook kwargs to a `hook_kwargs` param.  What do y'all think?
   
   This would separate the configuration necessary for the functioning of this operator from the configuration that _should_ be handled by airflow connection but may be overridden.  E.g. i am thinking session_parameters and authenticator would go in hook kwargs dict.   autocommit would be another but curiously it's not a hook param. (that said i don't think autocommit would have any effect whatsoever on an unload operation so can probably be removed here)

##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,

Review comment:
       i think it might help readability (and be more future-proof) if we consolidate hook kwargs to a `hook_kwargs` param.  What do y'all think?
   
   This would separate the configuration necessary for the functioning of this operator from the configuration that _should_ be handled by airflow connection but may be overridden.  E.g. i am thinking session_parameters and authenticator would go in hook kwargs dict.   autocommit would be another but curiously it's not a hook param. (that said i don't think autocommit would have any effect whatsoever on an unload operation so can probably be removed here anyway)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r583737533



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,
+        session_parameters: Optional[dict] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.stage = stage
+        self.s3_bucket = s3_bucket
+        self.s3_key = s3_key
+        self.file_format = file_format
+        self.warehouse = warehouse
+        self.database = database
+        self.snowflake_conn_id = snowflake_conn_id
+        self.query_or_table = query_or_table
+        self.unload_options = unload_options or []  # type: List
+        self.autocommit = autocommit
+        self.include_header = include_header
+        self.role = role
+        self.authenticator = authenticator
+        self.session_parameters = session_parameters
+
+    def execute(self, context):
+        snowflake_hook = SnowflakeHook(
+            snowflake_conn_id=self.snowflake_conn_id,
+            warehouse=self.warehouse,
+            database=self.database,
+            role=self.role,
+            authenticator=self.authenticator,
+            session_parameters=self.session_parameters,
+        )
+        unload_options = '\n\t'.join(self.unload_options)
+
+        if self.stage:
+            unload_query = f"""
+            COPY INTO @{self.stage}/{self.s3_key}
+            FROM ({self.query_or_table})
+            """
+
+            if self.file_format:
+                unload_query += f'FILE_FORMAT = ({self.file_format})'
+
+        else:
+            unload_query = f"""
+            COPY INTO 's3://{self.s3_bucket}/{self.s3_key}'
+            FROM ({self.query_or_table})
+            STORAGE_INTEGRATION = S3
+            FILE_FORMAT = ({self.file_format})
+            """
+
+        unload_query += f"""{unload_options}
+                            HEADER = {self.include_header};"""
+
+        self.log.info('Executing UNLOAD command...')
+        snowflake_hook.run(unload_query, self.autocommit)

Review comment:
       i recommend using cur.execute instead so you can run cur.fetchall afterwards and print to logs the output 
   
   snowflake tells you the files it created in the unload, and this is useful logging information.
   
   same as i suggested with your other PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#issuecomment-923449090


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r600049013



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,
+        session_parameters: Optional[dict] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.stage = stage
+        self.s3_bucket = s3_bucket
+        self.s3_key = s3_key
+        self.file_format = file_format
+        self.warehouse = warehouse
+        self.database = database
+        self.snowflake_conn_id = snowflake_conn_id
+        self.query_or_table = query_or_table
+        self.unload_options = unload_options or []  # type: List
+        self.autocommit = autocommit
+        self.include_header = include_header
+        self.role = role
+        self.authenticator = authenticator
+        self.session_parameters = session_parameters
+
+    def execute(self, context):
+        snowflake_hook = SnowflakeHook(
+            snowflake_conn_id=self.snowflake_conn_id,
+            warehouse=self.warehouse,
+            database=self.database,
+            role=self.role,
+            authenticator=self.authenticator,
+            session_parameters=self.session_parameters,
+        )
+        unload_options = '\n\t'.join(self.unload_options)
+
+        if self.stage:
+            unload_query = f"""
+            COPY INTO @{self.stage}/{self.s3_key}
+            FROM ({self.query_or_table})
+            """
+
+            if self.file_format:
+                unload_query += f'FILE_FORMAT = ({self.file_format})'
+
+        else:
+            unload_query = f"""
+            COPY INTO 's3://{self.s3_bucket}/{self.s3_key}'
+            FROM ({self.query_or_table})
+            STORAGE_INTEGRATION = S3
+            FILE_FORMAT = ({self.file_format})
+            """
+
+        unload_query += f"""{unload_options}
+                            HEADER = {self.include_header};"""
+
+        self.log.info('Executing UNLOAD command...')
+        snowflake_hook.run(unload_query, self.autocommit)

Review comment:
       > Do you think it makes sense to include the cur.fetchall directly in the hook?
   
   yes absolutely.  ideally you should also parse the results and provide configurable failure behavior.  e.g. should the task fail if any files are skipped? or if any files are failed? 
   
   > Also, I recall reading somewhere that it is not necessary to open the cursor like a context because snowflake handles that very well, but I don't find it. Do you know anything about it? Thanks!
   
   That might be true.  However, all of the example code in snowflake docs demonstrate closing of connections, and here they mention explicitly that it should be done:
   
   https://docs.snowflake.com/en/user-guide/python-connector-example.html#closing-the-connection




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#issuecomment-894118493


   > And I'd say it's not really transfer. It's just executing sql. It's purely an interaction with snowflake, and snowflake is handling the "transferring" internally. E.g. we're not pulling the rows out through connector into airflow and writing them out and uploading to s3 -- that would be more of a transfer.
   
   I agree. This is an operator for a specific operation in Snowflake, not a typical transfer operator.  For this to be an operator transfer, all credentials should be managed by Airflow to ensure a unified experience. In this case, AWS credentials are managed by Snowflaake. In Google providers, we had a similar case. To move data from GCS bucket to GCS  bucket, you can https://github.com/apache/airflow/blob/1bd3a5c68c88cf3840073d6276460a108f864187/airflow/providers/google/cloud/transfers/gcs_to_gcs.py#L29 operator or use Google service -- https://github.com/apache/airflow/blob/1bd3a5c68c88cf3840073d6276460a108f864187/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py#L166


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r582220285



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,
+        session_parameters: Optional[dict] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.stage = stage
+        self.s3_bucket = s3_bucket
+        self.s3_key = s3_key
+        self.file_format = file_format
+        self.warehouse = warehouse
+        self.database = database
+        self.snowflake_conn_id = snowflake_conn_id
+        self.query_or_table = query_or_table
+        self.unload_options = unload_options or []  # type: List

Review comment:
       ```suggestion
           self.unload_options: List = unload_options or []
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#issuecomment-876533797


   @dstandish - does it address your concerns :) ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r684080518



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,166 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    You can either pass the parameters table and schema or the parameter sql to unload an entire
+    table or a custom query.
+
+    :param s3_path: reference to a specific S3 path within a bucket or stage to download the data
+    :type s3_path: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param schema: reference to a specific schema in snowflake database
+    :type schema: str
+    :param table: reference to a specific table in snowflake database
+    :type table: str
+    :param sql: optional parameter to unload a customized query instead an entire table
+    :type sql: str
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param include_header: whether or not to include the header columns in the output file(s)
+        if possible
+    :type include_header: bool
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_path',
+        's3_bucket',
+        'sql',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    template_fields_renderers = {"sql": "sql"}
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        s3_path: str,
+        snowflake_conn_id: str = "snowflake_default",
+        schema: Optional[str] = None,
+        table: Optional[str] = None,
+        sql: Optional[str] = None,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str] = None,
+        file_format: Optional[str] = None,
+        unload_options: Optional[list] = None,
+        include_header: Optional[bool] = False,
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,
+        session_parameters: Optional[dict] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.s3_path = s3_path
+        self.snowflake_conn_id = snowflake_conn_id
+        self.schema = schema
+        self.table = table
+        self.sql = sql
+        self.stage = stage
+        self.s3_bucket = s3_bucket
+        self.file_format = file_format
+        self.unload_options: List = unload_options or []
+        self.include_header = include_header
+        self.database = database
+        self.warehouse = warehouse
+        self.role = role
+        self.authenticator = authenticator
+        self.session_parameters = session_parameters
+
+    def execute(self, context):
+        snowflake_hook = SnowflakeHook(
+            snowflake_conn_id=self.snowflake_conn_id,
+            warehouse=self.warehouse,
+            database=self.database,
+            role=self.role,
+            authenticator=self.authenticator,
+            session_parameters=self.session_parameters,
+        )
+        unload_options = '\n\t\t\t'.join(self.unload_options)
+
+        if self.sql:
+            self.sql = f"({self.sql})"
+        else:
+            self.sql = self.schema + '.' + self.table
+
+        if self.stage:
+            unload_query = f"""
+            COPY INTO @{self.stage}/{self.s3_path}
+            FROM {self.sql}
+            """
+        else:
+            unload_query = f"""
+            COPY INTO 's3://{self.s3_bucket}/{self.s3_path}'
+            FROM {self.sql}
+            STORAGE_INTEGRATION = S3

Review comment:
       It should be customizable. Each customer can create one or more integration with different names and ARN roles See: https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r582221145



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,
+        session_parameters: Optional[dict] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.stage = stage
+        self.s3_bucket = s3_bucket
+        self.s3_key = s3_key
+        self.file_format = file_format
+        self.warehouse = warehouse
+        self.database = database
+        self.snowflake_conn_id = snowflake_conn_id
+        self.query_or_table = query_or_table
+        self.unload_options = unload_options or []  # type: List
+        self.autocommit = autocommit
+        self.include_header = include_header
+        self.role = role
+        self.authenticator = authenticator
+        self.session_parameters = session_parameters
+
+    def execute(self, context):
+        snowflake_hook = SnowflakeHook(
+            snowflake_conn_id=self.snowflake_conn_id,
+            warehouse=self.warehouse,
+            database=self.database,
+            role=self.role,
+            authenticator=self.authenticator,
+            session_parameters=self.session_parameters,
+        )
+        unload_options = '\n\t'.join(self.unload_options)
+
+        if self.stage:
+            unload_query = f"""
+            COPY INTO @{self.stage}/{self.s3_key}
+            FROM ({self.query_or_table})
+            """

Review comment:
       Does this allows SQL injection?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#issuecomment-894094479


   > My belief is that is easier to import SnowflakeToS3 from amazon.aws.transfers, as all the operators that upload data to S3, than from snowflake.transfers.
   
   I think the easiest thing is having exactly _one_ snowflake copy into location operator and exactly _one_ place from which to import it.
   
   I wouldn't say it's impossible that there could be utility in having a subclass that is storage-specific.  But I think the first thing is producing the generic version, and then we'll be in a better place to evaluate whether the subclass is worth it's weight.
   
   And I'd say it's not really transfer.  It's just executing sql.  It's purely an interaction with snowflake, and snowflake is handling the "transferring" internally.  E.g. we're not pulling the rows out through connector into airflow and writing them out and uploading to s3 -- _that_ would be a transfer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r582219056



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],

Review comment:
       If this is optional should it have default value of `None`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r582219301



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,

Review comment:
       Can you please move this after required arguments?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r600049013



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,
+        session_parameters: Optional[dict] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.stage = stage
+        self.s3_bucket = s3_bucket
+        self.s3_key = s3_key
+        self.file_format = file_format
+        self.warehouse = warehouse
+        self.database = database
+        self.snowflake_conn_id = snowflake_conn_id
+        self.query_or_table = query_or_table
+        self.unload_options = unload_options or []  # type: List
+        self.autocommit = autocommit
+        self.include_header = include_header
+        self.role = role
+        self.authenticator = authenticator
+        self.session_parameters = session_parameters
+
+    def execute(self, context):
+        snowflake_hook = SnowflakeHook(
+            snowflake_conn_id=self.snowflake_conn_id,
+            warehouse=self.warehouse,
+            database=self.database,
+            role=self.role,
+            authenticator=self.authenticator,
+            session_parameters=self.session_parameters,
+        )
+        unload_options = '\n\t'.join(self.unload_options)
+
+        if self.stage:
+            unload_query = f"""
+            COPY INTO @{self.stage}/{self.s3_key}
+            FROM ({self.query_or_table})
+            """
+
+            if self.file_format:
+                unload_query += f'FILE_FORMAT = ({self.file_format})'
+
+        else:
+            unload_query = f"""
+            COPY INTO 's3://{self.s3_bucket}/{self.s3_key}'
+            FROM ({self.query_or_table})
+            STORAGE_INTEGRATION = S3
+            FILE_FORMAT = ({self.file_format})
+            """
+
+        unload_query += f"""{unload_options}
+                            HEADER = {self.include_header};"""
+
+        self.log.info('Executing UNLOAD command...')
+        snowflake_hook.run(unload_query, self.autocommit)

Review comment:
       > Do you think it makes sense to include the cur.fetchall directly in the hook?
   
   yes i think so, for sure.  it's useful to be able to see what happened.
   
   and same is true in your other pr with the the copy into table case.  in _that_ case ideally you should also parse the results and provide configurable failure behavior.  e.g. should the task fail if any files are skipped? or if any files are failed? 
   
   > Also, I recall reading somewhere that it is not necessary to open the cursor like a context because snowflake handles that very well, but I don't find it. Do you know anything about it? Thanks!
   
   That might be true.  However, all of the example code in snowflake docs demonstrate closing of connections, and here they mention explicitly that it should be done:
   
   https://docs.snowflake.com/en/user-guide/python-connector-example.html#closing-the-connection




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r684082651



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,166 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    You can either pass the parameters table and schema or the parameter sql to unload an entire
+    table or a custom query.
+
+    :param s3_path: reference to a specific S3 path within a bucket or stage to download the data
+    :type s3_path: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param schema: reference to a specific schema in snowflake database
+    :type schema: str
+    :param table: reference to a specific table in snowflake database
+    :type table: str
+    :param sql: optional parameter to unload a customized query instead an entire table
+    :type sql: str
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,

Review comment:
       Can you add a link to docs that explain all available options? https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r600049013



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,
+        session_parameters: Optional[dict] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.stage = stage
+        self.s3_bucket = s3_bucket
+        self.s3_key = s3_key
+        self.file_format = file_format
+        self.warehouse = warehouse
+        self.database = database
+        self.snowflake_conn_id = snowflake_conn_id
+        self.query_or_table = query_or_table
+        self.unload_options = unload_options or []  # type: List
+        self.autocommit = autocommit
+        self.include_header = include_header
+        self.role = role
+        self.authenticator = authenticator
+        self.session_parameters = session_parameters
+
+    def execute(self, context):
+        snowflake_hook = SnowflakeHook(
+            snowflake_conn_id=self.snowflake_conn_id,
+            warehouse=self.warehouse,
+            database=self.database,
+            role=self.role,
+            authenticator=self.authenticator,
+            session_parameters=self.session_parameters,
+        )
+        unload_options = '\n\t'.join(self.unload_options)
+
+        if self.stage:
+            unload_query = f"""
+            COPY INTO @{self.stage}/{self.s3_key}
+            FROM ({self.query_or_table})
+            """
+
+            if self.file_format:
+                unload_query += f'FILE_FORMAT = ({self.file_format})'
+
+        else:
+            unload_query = f"""
+            COPY INTO 's3://{self.s3_bucket}/{self.s3_key}'
+            FROM ({self.query_or_table})
+            STORAGE_INTEGRATION = S3
+            FILE_FORMAT = ({self.file_format})
+            """
+
+        unload_query += f"""{unload_options}
+                            HEADER = {self.include_header};"""
+
+        self.log.info('Executing UNLOAD command...')
+        snowflake_hook.run(unload_query, self.autocommit)

Review comment:
       > Do you think it makes sense to include the cur.fetchall directly in the hook?
   
   yes i think so.  it's useful to be able to see what happened.
   
   and same is true in your other pr with the the copy into table case.  in _that_ case ideally you should also parse the results and provide configurable failure behavior.  e.g. should the task fail if any files are skipped? or if any files are failed? 
   
   > Also, I recall reading somewhere that it is not necessary to open the cursor like a context because snowflake handles that very well, but I don't find it. Do you know anything about it? Thanks!
   
   That might be true.  However, all of the example code in snowflake docs demonstrate closing of connections, and here they mention explicitly that it should be done:
   
   https://docs.snowflake.com/en/user-guide/python-connector-example.html#closing-the-connection




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] JavierLopezT commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
JavierLopezT commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r599830260



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,
+        session_parameters: Optional[dict] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.stage = stage
+        self.s3_bucket = s3_bucket
+        self.s3_key = s3_key
+        self.file_format = file_format
+        self.warehouse = warehouse
+        self.database = database
+        self.snowflake_conn_id = snowflake_conn_id
+        self.query_or_table = query_or_table
+        self.unload_options = unload_options or []  # type: List
+        self.autocommit = autocommit
+        self.include_header = include_header
+        self.role = role
+        self.authenticator = authenticator
+        self.session_parameters = session_parameters
+
+    def execute(self, context):
+        snowflake_hook = SnowflakeHook(
+            snowflake_conn_id=self.snowflake_conn_id,
+            warehouse=self.warehouse,
+            database=self.database,
+            role=self.role,
+            authenticator=self.authenticator,
+            session_parameters=self.session_parameters,
+        )
+        unload_options = '\n\t'.join(self.unload_options)
+
+        if self.stage:
+            unload_query = f"""
+            COPY INTO @{self.stage}/{self.s3_key}
+            FROM ({self.query_or_table})
+            """
+
+            if self.file_format:
+                unload_query += f'FILE_FORMAT = ({self.file_format})'
+
+        else:
+            unload_query = f"""
+            COPY INTO 's3://{self.s3_bucket}/{self.s3_key}'
+            FROM ({self.query_or_table})
+            STORAGE_INTEGRATION = S3
+            FILE_FORMAT = ({self.file_format})
+            """
+
+        unload_query += f"""{unload_options}
+                            HEADER = {self.include_header};"""
+
+        self.log.info('Executing UNLOAD command...')
+        snowflake_hook.run(unload_query, self.autocommit)

Review comment:
       Do you think it makes sense to include the cur.fetchall directly in the hook? 
   
   Also, I recall reading somewhere that it is not necessary to open the cursor like a context because snowflake handles that very well, but I don't find it. Do you know anything about it? Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#issuecomment-877630621


   @JavierLopezT I am with @dstandish with that one :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r582221782



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,
+        session_parameters: Optional[dict] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.stage = stage
+        self.s3_bucket = s3_bucket
+        self.s3_key = s3_key
+        self.file_format = file_format
+        self.warehouse = warehouse
+        self.database = database
+        self.snowflake_conn_id = snowflake_conn_id
+        self.query_or_table = query_or_table
+        self.unload_options = unload_options or []  # type: List
+        self.autocommit = autocommit
+        self.include_header = include_header
+        self.role = role
+        self.authenticator = authenticator
+        self.session_parameters = session_parameters
+
+    def execute(self, context):
+        snowflake_hook = SnowflakeHook(
+            snowflake_conn_id=self.snowflake_conn_id,
+            warehouse=self.warehouse,
+            database=self.database,
+            role=self.role,
+            authenticator=self.authenticator,
+            session_parameters=self.session_parameters,
+        )
+        unload_options = '\n\t'.join(self.unload_options)
+
+        if self.stage:
+            unload_query = f"""
+            COPY INTO @{self.stage}/{self.s3_key}
+            FROM ({self.query_or_table})
+            """
+
+            if self.file_format:
+                unload_query += f'FILE_FORMAT = ({self.file_format})'
+
+        else:
+            unload_query = f"""
+            COPY INTO 's3://{self.s3_bucket}/{self.s3_key}'
+            FROM ({self.query_or_table})
+            STORAGE_INTEGRATION = S3
+            FILE_FORMAT = ({self.file_format})
+            """
+
+        unload_query += f"""{unload_options}
+                            HEADER = {self.include_header};"""

Review comment:
       There's a lot of leading white spaces in all strings - is this intended? I'm wondering how this will render in log




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r583704890



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):

Review comment:
       Since you are using unload for this, I recomment making it storage agnostic. 
   
   It's the same case as that other discussion we had going s3-to-snowflake.
   
   The snowflake unload command doesn't care what storage is behind the stage.
   
   So I recommend making this a generic SnowflakeUnloadOperator.  There's no need to use s3-specific terminology.  

##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake

Review comment:
       it's not necessarily a key.  If the data is large snowflake will split files.  Perhaps better to call it `path` or `prefix`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#issuecomment-835583365


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#issuecomment-785037866


   [The Workflow run](https://github.com/apache/airflow/actions/runs/595993037) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] JavierLopezT commented on pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
JavierLopezT commented on pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#issuecomment-927983491


   Closing. Anytime, I might open a PR with SnowflakeCopyIntoLocationOperator


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r583717028



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,
+        session_parameters: Optional[dict] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.stage = stage
+        self.s3_bucket = s3_bucket
+        self.s3_key = s3_key
+        self.file_format = file_format
+        self.warehouse = warehouse
+        self.database = database
+        self.snowflake_conn_id = snowflake_conn_id
+        self.query_or_table = query_or_table
+        self.unload_options = unload_options or []  # type: List
+        self.autocommit = autocommit
+        self.include_header = include_header
+        self.role = role
+        self.authenticator = authenticator
+        self.session_parameters = session_parameters
+
+    def execute(self, context):
+        snowflake_hook = SnowflakeHook(
+            snowflake_conn_id=self.snowflake_conn_id,
+            warehouse=self.warehouse,
+            database=self.database,
+            role=self.role,
+            authenticator=self.authenticator,
+            session_parameters=self.session_parameters,
+        )
+        unload_options = '\n\t'.join(self.unload_options)
+
+        if self.stage:
+            unload_query = f"""
+            COPY INTO @{self.stage}/{self.s3_key}
+            FROM ({self.query_or_table})
+            """
+
+            if self.file_format:
+                unload_query += f'FILE_FORMAT = ({self.file_format})'
+
+        else:
+            unload_query = f"""
+            COPY INTO 's3://{self.s3_bucket}/{self.s3_key}'

Review comment:
       ah i see you allow for not using stage.
   
   but here too the principle is the same.
   
   better to make the terminology generic and parameterize `storage_integration` i think.... that way we don't need to duplicate for all storage providers




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r583736296



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,

Review comment:
       this _only_ applies to csv --- this could be part of a SnowflakeCsvFormat class like i suggested above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] JavierLopezT commented on pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
JavierLopezT commented on pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#issuecomment-836335751


   commemt


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] JavierLopezT closed pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
JavierLopezT closed pull request #14415:
URL: https://github.com/apache/airflow/pull/14415


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r583705482



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake

Review comment:
       `s3_key` is not necessarily a _key_: if the data is large snowflake will split files.  Perhaps better to call it `path` or `prefix`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r582220285



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,
+        session_parameters: Optional[dict] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.stage = stage
+        self.s3_bucket = s3_bucket
+        self.s3_key = s3_key
+        self.file_format = file_format
+        self.warehouse = warehouse
+        self.database = database
+        self.snowflake_conn_id = snowflake_conn_id
+        self.query_or_table = query_or_table
+        self.unload_options = unload_options or []  # type: List

Review comment:
       ```suggestion
           self.unload_options: List = unload_options or []
   ```
   Would be good to add of what type are objects of this list




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] JavierLopezT commented on pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
JavierLopezT commented on pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#issuecomment-894080303


   Hello @potiuk and @dstandish. I guess that I can try to create a generic operator. However, I think it would be nice to still maintain the specific operators as a derived class from the generic one for importing. My belief is that is easier to import SnowflakeToS3 from amazon.aws.transfers, as all the operators that upload data to S3, than from snowflake.transfers. 
   
   What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r684079133



##########
File path: tests/providers/amazon/aws/transfers/test_snowflake_to_s3.py
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+from unittest import mock
+
+from airflow.providers.amazon.aws.transfers.snowflake_to_s3 import SnowflakeToS3Operator
+from tests.test_utils.asserts import assert_equal_ignore_multiple_spaces
+
+
+class TestSnowflakeToS3Transfer(unittest.TestCase):
+    @mock.patch("airflow.providers.snowflake.hooks.snowflake.SnowflakeHook.run")
+    def test_execute(
+        self,
+        mock_run,
+    ):
+
+        table = "table"
+        schema = "schema"
+        s3_bucket = "bucket"
+        s3_path = "key"
+        unload_options = [
+            'OVERWRITE = TRUE',
+        ]
+        file_format = "file_format"
+        include_header = True
+
+        SnowflakeToS3Operator(
+            table=table,
+            schema=schema,
+            s3_bucket=s3_bucket,
+            s3_path=s3_path,
+            file_format="file_format",
+            unload_options=unload_options,
+            include_header=include_header,
+            task_id="task_id",
+            dag=None,
+        ).execute(None)
+
+        unload_options = '\n\t\t\t'.join(unload_options)
+        unload_query = f"""
+        COPY INTO 's3://{s3_bucket}/{s3_path}'
+        FROM {schema}.{table}
+        STORAGE_INTEGRATION = S3"""

Review comment:
       It should be customizable. Each customer can create one or more integration with different names and ARN roles See: https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14415: SnowflakeToS3Operator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14415:
URL: https://github.com/apache/airflow/pull/14415#discussion_r583718992



##########
File path: airflow/providers/amazon/aws/transfers/snowflake_to_s3.py
##########
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Transfers data from Snowflake into a S3 Bucket."""
+from typing import List, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class SnowflakeToS3Operator(BaseOperator):
+    """
+    UNLOAD a query or a table from Snowflake to S3. It is not necessary a conn to S3 because
+    Snowflake handles it. You have to follow one of the setups in:
+    https://docs.snowflake.com/en/user-guide/data-load-s3-config.html
+
+    :param stage: Reference to a specific Snowflake stage to copy the data into it. This allows you to use
+        the method 'Configuring AWS IAM User Credentials' for unloading data to s3. If this is passed,
+        s3_bucket can't be used and file_format is not necessary
+    :type stage: str
+    :param warehouse: reference to a specific snowflake warehouse to override the one in the conn
+    :type warehouse: str
+    :param database: reference to a specific snowflake database to override the one in the conn
+    :type warehouse: str
+    :param s3_bucket: reference to a specific S3 bucket where the data will be saved. For using it, you
+        should have done the one-time setup 'Configuring a Snowflake Storage Integration'
+    :type s3_bucket: str
+    :param s3_key: reference to a specific S3 key within a bucket or stage.
+    :type s3_key: str
+    :param file_format: can be either a previous file format created in Snowflake
+        or hardcoded one like ``type = csv field_delimiter = ',' skip_header = 1``
+    :type file_format: str
+    :param query_or_table: query or full table to unload. If table, it must include the schema like
+        `schema.table`
+    :type query_or_table: str
+    :param snowflake_conn_id: reference to a specific snowflake connection
+    :type snowflake_conn_id: str
+    :param unload_options: reference to a list of UNLOAD options (SINGLE, MAX_FILE_SIZE,
+        OVERWRITE etc). Each element of the list has to be a string
+    :type unload_options: list
+    :param role: name of role (will overwrite any role defined in
+        connection's extra JSON)
+    :type role: str
+    :param authenticator: authenticator for Snowflake.
+        'snowflake' (default) to use the internal Snowflake authenticator
+        'externalbrowser' to authenticate using your web browser and
+        Okta, ADFS or any other SAML 2.0-compliant identify provider
+        (IdP) that has been defined for your account
+        'https://<your_okta_account_name>.okta.com' to authenticate
+        through native Okta.
+    :type authenticator: str
+    :param session_parameters: You can set session-level parameters at
+        the time you connect to Snowflake
+    :type session_parameters: dict
+    """
+
+    template_fields = (
+        's3_key',
+        's3_bucket',
+        'table_or_query',
+        'snowflake_conn_id',
+    )
+    template_ext = ('.sql',)
+    ui_color = '#ffebb2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        stage: Optional[str] = None,
+        s3_bucket: Optional[str],
+        s3_key: str,
+        query_or_table: str,
+        file_format: Optional[str],
+        database: Optional[str] = None,
+        warehouse: Optional[str] = None,
+        snowflake_conn_id: str = "snowflake_default",
+        unload_options: Optional[list] = None,
+        autocommit: bool = True,
+        include_header: bool = False,
+        role: Optional[str] = None,
+        authenticator: Optional[str] = None,
+        session_parameters: Optional[dict] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.stage = stage
+        self.s3_bucket = s3_bucket
+        self.s3_key = s3_key
+        self.file_format = file_format
+        self.warehouse = warehouse
+        self.database = database
+        self.snowflake_conn_id = snowflake_conn_id
+        self.query_or_table = query_or_table
+        self.unload_options = unload_options or []  # type: List
+        self.autocommit = autocommit
+        self.include_header = include_header
+        self.role = role
+        self.authenticator = authenticator
+        self.session_parameters = session_parameters
+
+    def execute(self, context):
+        snowflake_hook = SnowflakeHook(
+            snowflake_conn_id=self.snowflake_conn_id,
+            warehouse=self.warehouse,
+            database=self.database,
+            role=self.role,
+            authenticator=self.authenticator,
+            session_parameters=self.session_parameters,
+        )
+        unload_options = '\n\t'.join(self.unload_options)
+
+        if self.stage:
+            unload_query = f"""
+            COPY INTO @{self.stage}/{self.s3_key}
+            FROM ({self.query_or_table})
+            """
+
+            if self.file_format:
+                unload_query += f'FILE_FORMAT = ({self.file_format})'
+
+        else:
+            unload_query = f"""
+            COPY INTO 's3://{self.s3_bucket}/{self.s3_key}'
+            FROM ({self.query_or_table})
+            STORAGE_INTEGRATION = S3
+            FILE_FORMAT = ({self.file_format})
+            """
+
+        unload_query += f"""{unload_options}
+                            HEADER = {self.include_header};"""

Review comment:
       @turbaszek  i previously handled this with an `unindent_auto` function just for sql... if you think this would be helpful in the repo please suggest a location and i'll make a PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org