You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/11 08:52:06 UTC

[GitHub] [beam] purbanow opened a new pull request #12823: [BEAM-10882] Update Snowflake doc

purbanow opened a new pull request #12823:
URL: https://github.com/apache/beam/pull/12823


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [x] Update `CHANGES.md` with noteworthy changes.
    - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494764556



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.

Review comment:
       Good to know that! Done.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494788392



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash. Bucket will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which  maps data from a PCollection to an array of String values before the write operation saves the data to temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+    return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the target table does not exist. The following values are supported:
+  - CREATE_IF_NEEDED - default behaviour. The write operation checks whether the specified target table exists; if it does not, the write operation attempts to create the table Specify the schema for the target table using the table_schema parameter.
+  - CREATE_NEVER -  The write operation fails if the target table does not exist.
+
+- `write_disposition` Defines the write behaviour based on the table where data will be written to. The following values are supported:
+  - APPEND - Default behaviour. Written data is added to the existing rows in the table,
+  - EMPTY - The target table must be empty;  otherwise, the write operation fails,
+  - TRUNCATE - The write operation deletes all rows from the target table before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to CREATE_IF_NEEDED, the table_schema  parameter  enables specifying the schema for the created target table. A table schema is as JSON with the following structure:
+{{< highlight >}}
+{"schema":[
+    {
+      "dataType":{"type":"<COLUMN DATA TYPE>"},
+      "name":"<COLUMN  NAME> ",
+      "nullable": <NULLABLE>
+    },
+        ...
+  ]}
+{{< /highlight >}}
+All supported data types:
+{{< highlight >}}
+{"dataType":{"type":"date"},"name":"","nullable":false},
+{"dataType":{"type":"datetime"},"name":"","nullable":false},
+{"dataType":{"type":"time"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ltz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ntz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_tz"},"name":"","nullable":false},
+{"dataType":{"type":"boolean"},"name":"","nullable":false},
+{"dataType":{"type":"decimal","precision":38,"scale":1},"name":"","nullable":true},
+{"dataType":{"type":"double"},"name":"","nullable":false},
+{"dataType":{"type":"float"},"name":"","nullable":false},
+{"dataType":{"type":"integer","precision":38,"scale":0},"name":"","nullable":false},
+{"dataType":{"type":"number","precision":38,"scale":1},"name":"","nullable":false},
+{"dataType":{"type":"numeric","precision":40,"scale":2},"name":"","nullable":false},
+{"dataType":{"type":"real"},"name":"","nullable":false},
+{"dataType":{"type":"array"},"name":"","nullable":false},
+{"dataType":{"type":"object"},"name":"","nullable":false},
+{"dataType":{"type":"variant"},"name":"","nullable":true},
+{"dataType":{"type":"binary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"char","length":1},"name":"","nullable":false},
+{"dataType":{"type":"string","length":null},"name":"","nullable":false},
+{"dataType":{"type":"text","length":null},"name":"","nullable":false},
+{"dataType":{"type":"varbinary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"varchar","length":100},"name":"","nullable":false}]

Review comment:
       I agree. I found 2 of them to be true but the overall json schema is written above so just the dataType json object will be sufficient and much clearer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12823:
URL: https://github.com/apache/beam/pull/12823#issuecomment-698767496


   @TheNeuralBit I've done the suggested changes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r492540501



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,206 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Cross Language

Review comment:
       @pabloem @TheNeuralBit WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r496278227



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.

Review comment:
       +1, thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r492540501



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,206 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Cross Language

Review comment:
       @pabloem @TheNeuralBit WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r488745935



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,206 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Cross Language

Review comment:
       I wonder if this should be called 'Using SnowflakeIO in Python SDK' - what do you think? Users may not care if it's cross-lang, but rather that it works in their language of choice. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r488776570



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,206 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Cross Language

Review comment:
       +1
   
   It's worth having a note somewhere like "SnowflakeIO in Python can only be used on runners that support cross-language transforms". I'm wary of listing specific runners here since it will get out of date. It seems we should have a central location that lists runners that support cross-language, seems like a good thing to have on the compatibility matrix.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494779590



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494782338



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+

Review comment:
       Yeah, it used to be when the expansion services weren't picked up automatically. Removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r493921269



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.

Review comment:
       Similarly on the `WriteToSnowflake` side we can inspect user types and infer a schema from it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494783145



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494768476



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.

Review comment:
       Of course it is. Done for both read and write.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
purbanow commented on pull request #12823:
URL: https://github.com/apache/beam/pull/12823#issuecomment-692653308


   @TheNeuralBit will you find a moment for a CR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494788041



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash. Bucket will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which  maps data from a PCollection to an array of String values before the write operation saves the data to temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+    return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the target table does not exist. The following values are supported:
+  - CREATE_IF_NEEDED - default behaviour. The write operation checks whether the specified target table exists; if it does not, the write operation attempts to create the table Specify the schema for the target table using the table_schema parameter.
+  - CREATE_NEVER -  The write operation fails if the target table does not exist.
+
+- `write_disposition` Defines the write behaviour based on the table where data will be written to. The following values are supported:
+  - APPEND - Default behaviour. Written data is added to the existing rows in the table,
+  - EMPTY - The target table must be empty;  otherwise, the write operation fails,
+  - TRUNCATE - The write operation deletes all rows from the target table before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to CREATE_IF_NEEDED, the table_schema  parameter  enables specifying the schema for the created target table. A table schema is as JSON with the following structure:

Review comment:
       Done. I added a link to https://docs.snowflake.com/en/sql-reference/data-types.html




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494784053



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,

Review comment:
       Done.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit merged pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
TheNeuralBit merged pull request #12823:
URL: https://github.com/apache/beam/pull/12823


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494791059



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.

Review comment:
       I think it was considered but at the time of writing this connector Rows and Schemas were not well known and the csv + user mapper were already implemented. It would definitely be a nice feature to be done in the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
purbanow commented on pull request #12823:
URL: https://github.com/apache/beam/pull/12823#issuecomment-691014169






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494788891



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -204,14 +370,121 @@ Then:
 **Note**:
 SnowflakeIO uses COPY statements behind the scenes to write (using [COPY to table](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html)). StagingBucketName will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under the “stagingBucketName” path.
 
+**Optional** for batching:
+- `.withQuotationMark()`
+  - Default value: `‘` (single quotation mark).
+  - Accepts String with one character. It will surround all text (String) fields saved to CSV. It should be one of the accepted characters by [Snowflake’s](https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html) [FIELD_OPTIONALLY_ENCLOSED_BY](https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html) parameter (double quotation mark, single quotation mark or none).
+  - Example: `.withQuotationMark("'")`
+### Streaming write  (from unbounded source)
+It is required to create a [SnowPipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe.html) in the Snowflake console. SnowPipe should use the same integration and the same bucket as specified by .withStagingBucketName and .withStorageIntegrationName methods. The write operation might look as follows:
+{{< highlight java >}}
+data.apply(
+   SnowflakeIO.<type>write()
+      .withStagingBucketName("BUCKET NAME")
+      .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+      .withDataSourceConfiguration(dc)
+      .withUserDataMapper(mapper)
+      .withSnowPipe("MY_SNOW_PIPE")
+      .withFlushTimeLimit(Duration.millis(time))
+      .withFlushRowLimit(rowsNumber)
+      .withShardsNumber(shardsNumber)
+)
+{{< /highlight >}}
+#### Parameters
+**Required** for streaming:
+
+- ` .withDataSourceConfiguration()`
+  - Accepts a DatasourceConfiguration object.
+
+- `.toTable()`
+  - Accepts the target Snowflake table name.
+  - Example: `.toTable("MY_TABLE)`
+
+- `.withStagingBucketName()`
+  - Accepts a cloud bucket path ended with slash.
+  - Example: `.withStagingBucketName("gs://mybucket/my/dir/")`
+
+- `.withStorageIntegrationName()`
+  - Accepts a name of a Snowflake storage integration object created according to Snowflake documentationt.
+  - Example:
+{{< highlight >}}
+CREATE OR REPLACE STORAGE INTEGRATION test_integration
+TYPE = EXTERNAL_STAGE
+STORAGE_PROVIDER = GCS
+ENABLED = TRUE
+STORAGE_ALLOWED_LOCATIONS = ('gcs://bucket/');
+{{< /highlight >}}
+Then:
+{{< highlight >}}
+.withStorageIntegrationName(test_integration)
+{{< /highlight >}}
+
+- `.withSnowPipe()`
+  - Accepts the target SnowPipe name. `.withSnowPipe()` accepts the exact name of snowpipe.
+Example:
+{{< highlight >}}
+CREATE OR REPLACE PIPE test_database.public.test_gcs_pipe
+AS COPY INTO stream_table from @streamstage;
+{{< /highlight >}}
+
+   - Then:
+{{< highlight >}}
+.withSnowPipe(test_gcs_pipe)
+{{< /highlight >}}
+
+**Note**: this is important to provide **schema** and **database** names.
+- `.withUserDataMapper()`
+  - Accepts the [UserDataMapper](https://beam.apache.org/documentation/io/built-in/snowflake/#userdatamapper-function) function that will map a user's PCollection to an array of String values `(String[]).`
+
+**Note**:
+
+SnowflakeIO uses COPY statements behind the scenes to write (using [COPY to table](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html)). StagingBucketName will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under the “stagingBucketName” path.
+
+**Optional** for streaming:
+- `.withFlushTimeLimit()`
+  - Default value: 30 seconds
+  - Accepts Duration objects with the specified time after each the streaming write will be repeated
+  - Example: `.withFlushTimeLimit(Duration.millis(180000))`
+
+- `.withFlushRowLimit()`
+  - Default value: 10,000 rows
+  - Limit of rows written to each file staged file

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494770305



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query

Review comment:
       Done for this and the rest of such cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
purbanow commented on pull request #12823:
URL: https://github.com/apache/beam/pull/12823#issuecomment-691014169






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494781949



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r493908171



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.

Review comment:
       ```suggestion
   - `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/current/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
   ```
   
   All of the javadoc and pydoc links should refer to `current` instead of a specific version number

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query

Review comment:
       nit: please make sure only the argument names are code formatted when you are referencing two different args, e.g.:
   
   ```suggestion
   - `table` or `query` Specifies a Snowflake table name or custom SQL query
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}

Review comment:
       ```suggestion
   {{< highlight py >}}
   ```
   
   Please specify `py` or `java` in all the relevant code blocks so that syntax is properly highlighted

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}

Review comment:
       These exampels can drop the pipeline options and instead just focus on applying the transform to the pipeline, e.g.:
   ```py
   (p | <SOURCE>
      | WriteToSnowflake(...)
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash. Bucket will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which  maps data from a PCollection to an array of String values before the write operation saves the data to temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+    return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the target table does not exist. The following values are supported:
+  - CREATE_IF_NEEDED - default behaviour. The write operation checks whether the specified target table exists; if it does not, the write operation attempts to create the table Specify the schema for the target table using the table_schema parameter.
+  - CREATE_NEVER -  The write operation fails if the target table does not exist.
+
+- `write_disposition` Defines the write behaviour based on the table where data will be written to. The following values are supported:
+  - APPEND - Default behaviour. Written data is added to the existing rows in the table,
+  - EMPTY - The target table must be empty;  otherwise, the write operation fails,
+  - TRUNCATE - The write operation deletes all rows from the target table before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to CREATE_IF_NEEDED, the table_schema  parameter  enables specifying the schema for the created target table. A table schema is as JSON with the following structure:

Review comment:
       Is there documentation we can refer to about how all of these types are encoded in CSV?

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash. Bucket will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which  maps data from a PCollection to an array of String values before the write operation saves the data to temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+    return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the target table does not exist. The following values are supported:
+  - CREATE_IF_NEEDED - default behaviour. The write operation checks whether the specified target table exists; if it does not, the write operation attempts to create the table Specify the schema for the target table using the table_schema parameter.
+  - CREATE_NEVER -  The write operation fails if the target table does not exist.
+
+- `write_disposition` Defines the write behaviour based on the table where data will be written to. The following values are supported:
+  - APPEND - Default behaviour. Written data is added to the existing rows in the table,
+  - EMPTY - The target table must be empty;  otherwise, the write operation fails,
+  - TRUNCATE - The write operation deletes all rows from the target table before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to CREATE_IF_NEEDED, the table_schema  parameter  enables specifying the schema for the created target table. A table schema is as JSON with the following structure:
+{{< highlight >}}
+{"schema":[
+    {
+      "dataType":{"type":"<COLUMN DATA TYPE>"},
+      "name":"<COLUMN  NAME> ",
+      "nullable": <NULLABLE>
+    },
+        ...
+  ]}
+{{< /highlight >}}
+All supported data types:
+{{< highlight >}}
+{"dataType":{"type":"date"},"name":"","nullable":false},
+{"dataType":{"type":"datetime"},"name":"","nullable":false},
+{"dataType":{"type":"time"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ltz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ntz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_tz"},"name":"","nullable":false},
+{"dataType":{"type":"boolean"},"name":"","nullable":false},
+{"dataType":{"type":"decimal","precision":38,"scale":1},"name":"","nullable":true},
+{"dataType":{"type":"double"},"name":"","nullable":false},
+{"dataType":{"type":"float"},"name":"","nullable":false},
+{"dataType":{"type":"integer","precision":38,"scale":0},"name":"","nullable":false},
+{"dataType":{"type":"number","precision":38,"scale":1},"name":"","nullable":false},
+{"dataType":{"type":"numeric","precision":40,"scale":2},"name":"","nullable":false},
+{"dataType":{"type":"real"},"name":"","nullable":false},
+{"dataType":{"type":"array"},"name":"","nullable":false},
+{"dataType":{"type":"object"},"name":"","nullable":false},
+{"dataType":{"type":"variant"},"name":"","nullable":true},
+{"dataType":{"type":"binary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"char","length":1},"name":"","nullable":false},
+{"dataType":{"type":"string","length":null},"name":"","nullable":false},
+{"dataType":{"type":"text","length":null},"name":"","nullable":false},
+{"dataType":{"type":"varbinary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"varchar","length":100},"name":"","nullable":false}]

Review comment:
       Does the fact that all of these have `"nullable":false` indicate nulls aren't supported? This could be more concise if you just include the "dataType" object, e.g.:
   ```suggestion
   {"type":"varchar","length":100}
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.

Review comment:
       ```suggestion
   One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/current/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.

Review comment:
       ```suggestion
   - `warehouse` specifies Snowflake warehouse name. If not specified the user's default will be used.
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+

Review comment:
       ```suggestion
   ```
   
   I don't think this is necessary

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,

Review comment:
       ```suggestion
              table=<SNOWFLAKE TABLE>,
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.

Review comment:
       ```suggestion
   - `role` specifies Snowflake role. If not specified the user's default will be used.
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.

Review comment:
       This page shouldn't state Flink is the only runner the supports cross-language, instead please refer users to https://beam.apache.org/roadmap/connectors-multi-sdk/#cross-language-transforms-api-and-expansion-service for information about which runners support it.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.

Review comment:
       This is optional right? In other cross-language transforms we will download the appropriate expansion service jar and start it for you if a URL isn't specified.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -204,14 +370,121 @@ Then:
 **Note**:
 SnowflakeIO uses COPY statements behind the scenes to write (using [COPY to table](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html)). StagingBucketName will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under the “stagingBucketName” path.
 
+**Optional** for batching:
+- `.withQuotationMark()`
+  - Default value: `‘` (single quotation mark).
+  - Accepts String with one character. It will surround all text (String) fields saved to CSV. It should be one of the accepted characters by [Snowflake’s](https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html) [FIELD_OPTIONALLY_ENCLOSED_BY](https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html) parameter (double quotation mark, single quotation mark or none).
+  - Example: `.withQuotationMark("'")`
+### Streaming write  (from unbounded source)
+It is required to create a [SnowPipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe.html) in the Snowflake console. SnowPipe should use the same integration and the same bucket as specified by .withStagingBucketName and .withStorageIntegrationName methods. The write operation might look as follows:
+{{< highlight java >}}
+data.apply(
+   SnowflakeIO.<type>write()
+      .withStagingBucketName("BUCKET NAME")
+      .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+      .withDataSourceConfiguration(dc)
+      .withUserDataMapper(mapper)
+      .withSnowPipe("MY_SNOW_PIPE")
+      .withFlushTimeLimit(Duration.millis(time))
+      .withFlushRowLimit(rowsNumber)
+      .withShardsNumber(shardsNumber)
+)
+{{< /highlight >}}
+#### Parameters
+**Required** for streaming:
+
+- ` .withDataSourceConfiguration()`
+  - Accepts a DatasourceConfiguration object.
+
+- `.toTable()`
+  - Accepts the target Snowflake table name.
+  - Example: `.toTable("MY_TABLE)`
+
+- `.withStagingBucketName()`
+  - Accepts a cloud bucket path ended with slash.
+  - Example: `.withStagingBucketName("gs://mybucket/my/dir/")`
+
+- `.withStorageIntegrationName()`
+  - Accepts a name of a Snowflake storage integration object created according to Snowflake documentationt.
+  - Example:
+{{< highlight >}}
+CREATE OR REPLACE STORAGE INTEGRATION test_integration
+TYPE = EXTERNAL_STAGE
+STORAGE_PROVIDER = GCS
+ENABLED = TRUE
+STORAGE_ALLOWED_LOCATIONS = ('gcs://bucket/');
+{{< /highlight >}}
+Then:
+{{< highlight >}}
+.withStorageIntegrationName(test_integration)
+{{< /highlight >}}
+
+- `.withSnowPipe()`
+  - Accepts the target SnowPipe name. `.withSnowPipe()` accepts the exact name of snowpipe.
+Example:
+{{< highlight >}}
+CREATE OR REPLACE PIPE test_database.public.test_gcs_pipe
+AS COPY INTO stream_table from @streamstage;
+{{< /highlight >}}
+
+   - Then:
+{{< highlight >}}
+.withSnowPipe(test_gcs_pipe)
+{{< /highlight >}}
+
+**Note**: this is important to provide **schema** and **database** names.
+- `.withUserDataMapper()`
+  - Accepts the [UserDataMapper](https://beam.apache.org/documentation/io/built-in/snowflake/#userdatamapper-function) function that will map a user's PCollection to an array of String values `(String[]).`
+
+**Note**:
+
+SnowflakeIO uses COPY statements behind the scenes to write (using [COPY to table](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html)). StagingBucketName will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under the “stagingBucketName” path.
+
+**Optional** for streaming:
+- `.withFlushTimeLimit()`
+  - Default value: 30 seconds
+  - Accepts Duration objects with the specified time after each the streaming write will be repeated
+  - Example: `.withFlushTimeLimit(Duration.millis(180000))`
+
+- `.withFlushRowLimit()`
+  - Default value: 10,000 rows
+  - Limit of rows written to each file staged file

Review comment:
       ```suggestion
     - Limit of rows written to each staged file
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,

Review comment:
       ```suggestion
              table=<SNOWFLAKE TABLE>,
   ```

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash. Bucket will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which  maps data from a PCollection to an array of String values before the write operation saves the data to temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+    return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the target table does not exist. The following values are supported:
+  - CREATE_IF_NEEDED - default behaviour. The write operation checks whether the specified target table exists; if it does not, the write operation attempts to create the table Specify the schema for the target table using the table_schema parameter.
+  - CREATE_NEVER -  The write operation fails if the target table does not exist.
+
+- `write_disposition` Defines the write behaviour based on the table where data will be written to. The following values are supported:
+  - APPEND - Default behaviour. Written data is added to the existing rows in the table,
+  - EMPTY - The target table must be empty;  otherwise, the write operation fails,
+  - TRUNCATE - The write operation deletes all rows from the target table before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to CREATE_IF_NEEDED, the table_schema  parameter  enables specifying the schema for the created target table. A table schema is as JSON with the following structure:

Review comment:
       ```suggestion
   - `table_schema` When the `create_disposition` parameter is set to CREATE_IF_NEEDED, the table_schema  parameter  enables specifying the schema for the created target table. A table schema is a JSON array with the following structure:
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494788392



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash. Bucket will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which  maps data from a PCollection to an array of String values before the write operation saves the data to temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+    return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the target table does not exist. The following values are supported:
+  - CREATE_IF_NEEDED - default behaviour. The write operation checks whether the specified target table exists; if it does not, the write operation attempts to create the table Specify the schema for the target table using the table_schema parameter.
+  - CREATE_NEVER -  The write operation fails if the target table does not exist.
+
+- `write_disposition` Defines the write behaviour based on the table where data will be written to. The following values are supported:
+  - APPEND - Default behaviour. Written data is added to the existing rows in the table,
+  - EMPTY - The target table must be empty;  otherwise, the write operation fails,
+  - TRUNCATE - The write operation deletes all rows from the target table before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to CREATE_IF_NEEDED, the table_schema  parameter  enables specifying the schema for the created target table. A table schema is as JSON with the following structure:
+{{< highlight >}}
+{"schema":[
+    {
+      "dataType":{"type":"<COLUMN DATA TYPE>"},
+      "name":"<COLUMN  NAME> ",
+      "nullable": <NULLABLE>
+    },
+        ...
+  ]}
+{{< /highlight >}}
+All supported data types:
+{{< highlight >}}
+{"dataType":{"type":"date"},"name":"","nullable":false},
+{"dataType":{"type":"datetime"},"name":"","nullable":false},
+{"dataType":{"type":"time"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ltz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ntz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_tz"},"name":"","nullable":false},
+{"dataType":{"type":"boolean"},"name":"","nullable":false},
+{"dataType":{"type":"decimal","precision":38,"scale":1},"name":"","nullable":true},
+{"dataType":{"type":"double"},"name":"","nullable":false},
+{"dataType":{"type":"float"},"name":"","nullable":false},
+{"dataType":{"type":"integer","precision":38,"scale":0},"name":"","nullable":false},
+{"dataType":{"type":"number","precision":38,"scale":1},"name":"","nullable":false},
+{"dataType":{"type":"numeric","precision":40,"scale":2},"name":"","nullable":false},
+{"dataType":{"type":"real"},"name":"","nullable":false},
+{"dataType":{"type":"array"},"name":"","nullable":false},
+{"dataType":{"type":"object"},"name":"","nullable":false},
+{"dataType":{"type":"variant"},"name":"","nullable":true},
+{"dataType":{"type":"binary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"char","length":1},"name":"","nullable":false},
+{"dataType":{"type":"string","length":null},"name":"","nullable":false},
+{"dataType":{"type":"text","length":null},"name":"","nullable":false},
+{"dataType":{"type":"varbinary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"varchar","length":100},"name":"","nullable":false}]

Review comment:
       I agree. I found 2 of nullables to be true but the overall json schema is written above so just the dataType json object will be sufficient and much clearer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
purbanow commented on pull request #12823:
URL: https://github.com/apache/beam/pull/12823#issuecomment-691014169


   @pabloem will you find a moment for a CR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r489460105



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,206 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Cross Language

Review comment:
       I changed to `Using SnowflakeIO in Python SDK'` and updated a little intro. 
   
   About the second, maybe lets create an issue for this? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r493907866



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.

Review comment:
       Have you considered using Beam Schemas and Rows here instead of having users provide a CSV mapper? If the SnowflakeIO Read transform produces a PCollection with a schema, then on the Python side we will generate a class that lets users access each field by attribute.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494764556



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.

Review comment:
       Good to know that! Done.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.

Review comment:
       Done.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.

Review comment:
       Of course it is. Done for both read and write.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query

Review comment:
       Done for this and the rest of such cases.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.

Review comment:
       Done.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.

Review comment:
       Done.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}

Review comment:
       Done.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.

Review comment:
       Done.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+

Review comment:
       Yeah, it used to be when the expansion services weren't picked up automatically. Removed.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}

Review comment:
       Done.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,

Review comment:
       Done.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,

Review comment:
       Done.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash. Bucket will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which  maps data from a PCollection to an array of String values before the write operation saves the data to temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+    return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the target table does not exist. The following values are supported:
+  - CREATE_IF_NEEDED - default behaviour. The write operation checks whether the specified target table exists; if it does not, the write operation attempts to create the table Specify the schema for the target table using the table_schema parameter.
+  - CREATE_NEVER -  The write operation fails if the target table does not exist.
+
+- `write_disposition` Defines the write behaviour based on the table where data will be written to. The following values are supported:
+  - APPEND - Default behaviour. Written data is added to the existing rows in the table,
+  - EMPTY - The target table must be empty;  otherwise, the write operation fails,
+  - TRUNCATE - The write operation deletes all rows from the target table before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to CREATE_IF_NEEDED, the table_schema  parameter  enables specifying the schema for the created target table. A table schema is as JSON with the following structure:

Review comment:
       Done. I added a link to https://docs.snowflake.com/en/sql-reference/data-types.html

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash. Bucket will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which  maps data from a PCollection to an array of String values before the write operation saves the data to temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+    return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the target table does not exist. The following values are supported:
+  - CREATE_IF_NEEDED - default behaviour. The write operation checks whether the specified target table exists; if it does not, the write operation attempts to create the table Specify the schema for the target table using the table_schema parameter.
+  - CREATE_NEVER -  The write operation fails if the target table does not exist.
+
+- `write_disposition` Defines the write behaviour based on the table where data will be written to. The following values are supported:
+  - APPEND - Default behaviour. Written data is added to the existing rows in the table,
+  - EMPTY - The target table must be empty;  otherwise, the write operation fails,
+  - TRUNCATE - The write operation deletes all rows from the target table before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to CREATE_IF_NEEDED, the table_schema  parameter  enables specifying the schema for the created target table. A table schema is as JSON with the following structure:
+{{< highlight >}}
+{"schema":[
+    {
+      "dataType":{"type":"<COLUMN DATA TYPE>"},
+      "name":"<COLUMN  NAME> ",
+      "nullable": <NULLABLE>
+    },
+        ...
+  ]}
+{{< /highlight >}}
+All supported data types:
+{{< highlight >}}
+{"dataType":{"type":"date"},"name":"","nullable":false},
+{"dataType":{"type":"datetime"},"name":"","nullable":false},
+{"dataType":{"type":"time"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ltz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ntz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_tz"},"name":"","nullable":false},
+{"dataType":{"type":"boolean"},"name":"","nullable":false},
+{"dataType":{"type":"decimal","precision":38,"scale":1},"name":"","nullable":true},
+{"dataType":{"type":"double"},"name":"","nullable":false},
+{"dataType":{"type":"float"},"name":"","nullable":false},
+{"dataType":{"type":"integer","precision":38,"scale":0},"name":"","nullable":false},
+{"dataType":{"type":"number","precision":38,"scale":1},"name":"","nullable":false},
+{"dataType":{"type":"numeric","precision":40,"scale":2},"name":"","nullable":false},
+{"dataType":{"type":"real"},"name":"","nullable":false},
+{"dataType":{"type":"array"},"name":"","nullable":false},
+{"dataType":{"type":"object"},"name":"","nullable":false},
+{"dataType":{"type":"variant"},"name":"","nullable":true},
+{"dataType":{"type":"binary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"char","length":1},"name":"","nullable":false},
+{"dataType":{"type":"string","length":null},"name":"","nullable":false},
+{"dataType":{"type":"text","length":null},"name":"","nullable":false},
+{"dataType":{"type":"varbinary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"varchar","length":100},"name":"","nullable":false}]

Review comment:
       I agree. I found 2 of them to be true but the overall json schema is written above so just the dataType json object will be sufficient and much clearer.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+### Writing to Snowflake
+One of the functions of SnowflakeIO is writing to Snowflake tables. This transformation enables you to finish the Beam pipeline with an output operation that sends the user's [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) to your Snowflake database.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | <SOURCE OF DATA>
+       | WriteToSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           create_disposition=<CREATE DISPOSITION>,
+           write_disposition=<WRITE DISPOSITION>,
+           table_schema=<SNOWFLAKE TABLE SCHEMA>,
+           user_data_mapper=<USER DATA MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+#### Required parameters
+
+- `server_name` Full Snowflake server name with account, zone and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Path to Google Cloud Storage bucket ended with slash. Bucket will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under “staging_bucket_name” path.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `user_data_mapper` Specifies a function which  maps data from a PCollection to an array of String values before the write operation saves the data to temporary .csv files.
+Example:
+{{< highlight >}}
+def user_data_mapper(user):
+    return [user.name, str(user.age)]
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service` Specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combination of valid parameters for authentication:
+
+- `username and password` Specifies username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.
+
+- `create_disposition` Defines the behaviour of the write operation if the target table does not exist. The following values are supported:
+  - CREATE_IF_NEEDED - default behaviour. The write operation checks whether the specified target table exists; if it does not, the write operation attempts to create the table Specify the schema for the target table using the table_schema parameter.
+  - CREATE_NEVER -  The write operation fails if the target table does not exist.
+
+- `write_disposition` Defines the write behaviour based on the table where data will be written to. The following values are supported:
+  - APPEND - Default behaviour. Written data is added to the existing rows in the table,
+  - EMPTY - The target table must be empty;  otherwise, the write operation fails,
+  - TRUNCATE - The write operation deletes all rows from the target table before writing to it.
+
+- `table_schema` When the create_disposition parameter is set to CREATE_IF_NEEDED, the table_schema  parameter  enables specifying the schema for the created target table. A table schema is as JSON with the following structure:
+{{< highlight >}}
+{"schema":[
+    {
+      "dataType":{"type":"<COLUMN DATA TYPE>"},
+      "name":"<COLUMN  NAME> ",
+      "nullable": <NULLABLE>
+    },
+        ...
+  ]}
+{{< /highlight >}}
+All supported data types:
+{{< highlight >}}
+{"dataType":{"type":"date"},"name":"","nullable":false},
+{"dataType":{"type":"datetime"},"name":"","nullable":false},
+{"dataType":{"type":"time"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ltz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_ntz"},"name":"","nullable":false},
+{"dataType":{"type":"timestamp_tz"},"name":"","nullable":false},
+{"dataType":{"type":"boolean"},"name":"","nullable":false},
+{"dataType":{"type":"decimal","precision":38,"scale":1},"name":"","nullable":true},
+{"dataType":{"type":"double"},"name":"","nullable":false},
+{"dataType":{"type":"float"},"name":"","nullable":false},
+{"dataType":{"type":"integer","precision":38,"scale":0},"name":"","nullable":false},
+{"dataType":{"type":"number","precision":38,"scale":1},"name":"","nullable":false},
+{"dataType":{"type":"numeric","precision":40,"scale":2},"name":"","nullable":false},
+{"dataType":{"type":"real"},"name":"","nullable":false},
+{"dataType":{"type":"array"},"name":"","nullable":false},
+{"dataType":{"type":"object"},"name":"","nullable":false},
+{"dataType":{"type":"variant"},"name":"","nullable":true},
+{"dataType":{"type":"binary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"char","length":1},"name":"","nullable":false},
+{"dataType":{"type":"string","length":null},"name":"","nullable":false},
+{"dataType":{"type":"text","length":null},"name":"","nullable":false},
+{"dataType":{"type":"varbinary","size":null},"name":"","nullable":false},
+{"dataType":{"type":"varchar","length":100},"name":"","nullable":false}]

Review comment:
       I agree. I found 2 of nullables to be true but the overall json schema is written above so just the dataType json object will be sufficient and much clearer.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -204,14 +370,121 @@ Then:
 **Note**:
 SnowflakeIO uses COPY statements behind the scenes to write (using [COPY to table](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html)). StagingBucketName will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under the “stagingBucketName” path.
 
+**Optional** for batching:
+- `.withQuotationMark()`
+  - Default value: `‘` (single quotation mark).
+  - Accepts String with one character. It will surround all text (String) fields saved to CSV. It should be one of the accepted characters by [Snowflake’s](https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html) [FIELD_OPTIONALLY_ENCLOSED_BY](https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html) parameter (double quotation mark, single quotation mark or none).
+  - Example: `.withQuotationMark("'")`
+### Streaming write  (from unbounded source)
+It is required to create a [SnowPipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe.html) in the Snowflake console. SnowPipe should use the same integration and the same bucket as specified by .withStagingBucketName and .withStorageIntegrationName methods. The write operation might look as follows:
+{{< highlight java >}}
+data.apply(
+   SnowflakeIO.<type>write()
+      .withStagingBucketName("BUCKET NAME")
+      .withStorageIntegrationName("STORAGE INTEGRATION NAME")
+      .withDataSourceConfiguration(dc)
+      .withUserDataMapper(mapper)
+      .withSnowPipe("MY_SNOW_PIPE")
+      .withFlushTimeLimit(Duration.millis(time))
+      .withFlushRowLimit(rowsNumber)
+      .withShardsNumber(shardsNumber)
+)
+{{< /highlight >}}
+#### Parameters
+**Required** for streaming:
+
+- ` .withDataSourceConfiguration()`
+  - Accepts a DatasourceConfiguration object.
+
+- `.toTable()`
+  - Accepts the target Snowflake table name.
+  - Example: `.toTable("MY_TABLE)`
+
+- `.withStagingBucketName()`
+  - Accepts a cloud bucket path ended with slash.
+  - Example: `.withStagingBucketName("gs://mybucket/my/dir/")`
+
+- `.withStorageIntegrationName()`
+  - Accepts a name of a Snowflake storage integration object created according to Snowflake documentationt.
+  - Example:
+{{< highlight >}}
+CREATE OR REPLACE STORAGE INTEGRATION test_integration
+TYPE = EXTERNAL_STAGE
+STORAGE_PROVIDER = GCS
+ENABLED = TRUE
+STORAGE_ALLOWED_LOCATIONS = ('gcs://bucket/');
+{{< /highlight >}}
+Then:
+{{< highlight >}}
+.withStorageIntegrationName(test_integration)
+{{< /highlight >}}
+
+- `.withSnowPipe()`
+  - Accepts the target SnowPipe name. `.withSnowPipe()` accepts the exact name of snowpipe.
+Example:
+{{< highlight >}}
+CREATE OR REPLACE PIPE test_database.public.test_gcs_pipe
+AS COPY INTO stream_table from @streamstage;
+{{< /highlight >}}
+
+   - Then:
+{{< highlight >}}
+.withSnowPipe(test_gcs_pipe)
+{{< /highlight >}}
+
+**Note**: this is important to provide **schema** and **database** names.
+- `.withUserDataMapper()`
+  - Accepts the [UserDataMapper](https://beam.apache.org/documentation/io/built-in/snowflake/#userdatamapper-function) function that will map a user's PCollection to an array of String values `(String[]).`
+
+**Note**:
+
+SnowflakeIO uses COPY statements behind the scenes to write (using [COPY to table](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html)). StagingBucketName will be used to save CSV files which will end up in Snowflake. Those CSV files will be saved under the “stagingBucketName” path.
+
+**Optional** for streaming:
+- `.withFlushTimeLimit()`
+  - Default value: 30 seconds
+  - Accepts Duration objects with the specified time after each the streaming write will be repeated
+  - Example: `.withFlushTimeLimit(Duration.millis(180000))`
+
+- `.withFlushRowLimit()`
+  - Default value: 10,000 rows
+  - Limit of rows written to each file staged file

Review comment:
       Done.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.

Review comment:
       I think it was considered but at the time of writing this connector Rows and Schemas were not well known and the csv + user mapper were already implemented. It would definitely be a nice feature to be done in the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on pull request #12823:
URL: https://github.com/apache/beam/pull/12823#issuecomment-698767496


   @TheNeuralBit I've done the suggested changes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] piotr-szuberski commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
piotr-szuberski commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r494770682



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.

Review comment:
       Done.

##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,208 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Using SnowflakeIO in Python SDK
+### Intro
+Snowflake cross-language implementation is supporting both reading and writing operations for Python programming language, thanks to
+cross-language which is part of [Portability Framework Roadmap](https://beam.apache.org/roadmap/portability/) which aims to provide full interoperability
+across the Beam ecosystem. From a developer perspective it means the possibility of combining transforms written in different languages(Java/Python/Go).
+
+Currently, cross-language is supporting only [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
+For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
+and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
+
+### Set up
+Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup.
+
+### Reading from Snowflake
+One of the functions of SnowflakeIO is reading Snowflake tables - either full tables via table name or custom data via query. Output of the read transform is a [PCollection](https://beam.apache.org/releases/pydoc/2.20.0/apache_beam.pvalue.html#apache_beam.pvalue.PCollection) of user-defined data type.
+#### General usage
+{{< highlight >}}
+OPTIONS = [
+   "--runner=FlinkRunner",
+   "--flink_version=1.10",
+   "--flink_master=localhost:8081",
+   "--environment_type=LOOPBACK"
+]
+
+with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
+   (p
+       | ReadFromSnowflake(
+           server_name=<SNOWFLAKE SERVER NAME>,
+           username=<SNOWFLAKE USERNAME>,
+           password=<SNOWFLAKE PASSWORD>,
+           o_auth_token=<OAUTH TOKEN>,
+           private_key_path=<PATH TO P8 FILE>,
+           raw_private_key=<PRIVATE_KEY>
+           private_key_passphrase=<PASSWORD FOR KEY>,
+           schema=<SNOWFLAKE SCHEMA>,
+           database=<SNOWFLAKE DATABASE>,
+           staging_bucket_name=<GCS BUCKET NAME>,
+           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
+           csv_mapper=<CSV MAPPER FUNCTION>,
+           table=<SNOWFALKE TABLE>,
+           query=<IF NOT TABLE THEN QUERY>,
+           role=<SNOWFLAKE ROLE>,
+           warehouse=<SNOWFLAKE WAREHOUSE>,
+           expansion_service=<EXPANSION SERVICE ADDRESS>))
+{{< /highlight >}}
+
+#### Required parameters
+- `server_name` Full Snowflake server name with an account, zone, and domain.
+
+- `schema` Name of the Snowflake schema in the database to use.
+
+- `database` Name of the Snowflake database to use.
+
+- `staging_bucket_name` Name of the Google Cloud Storage bucket. Bucket will be used as a temporary location for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` and they will be removed automatically once Read operation finishes.
+
+- `storage_integration_name` Is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.net/manuals/sql-reference/sql/create-storage-integration.html).
+
+- `csv_mapper` Specifies a function which must translate user-defined object to array of strings. SnowflakeIO uses a [COPY INTO <location>](https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-location.html) statement to move data from a Snowflake table to Google Cloud Storage as CSV files. These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. The csv_mapper function job is to give the user the possibility to convert the array of Strings to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects.
+Example:
+{{< highlight >}}
+def csv_mapper(strings_array):
+    return User(strings_array[0], int(strings_array[1])))
+{{< /highlight >}}
+
+- `table or query` Specifies a Snowflake table name or custom SQL query
+
+- `expansion_service`: specifies URL of expansion service.
+
+#### Authentication parameters
+It’s required to pass one of the following combinations of valid parameters for authentication:
+- `username and password` Specifies username and password for username/password authentication method.
+
+- `private_key_path and private_key_passphrase` Specifies a path to private key and passphrase for key/pair authentication method.
+
+- `raw_private_key and private_key_passphrase` Specifies a private key and passphrase for key/pair authentication method.
+
+- `o_auth_token` Specifies access token for OAuth authentication method.
+
+#### Additional parameters
+- `role`: specifies Snowflake role. If not specified then the user's default will be used.
+
+- `warehouse`: specifies Snowflake warehouse name. If not specified then the user's default will be used.

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #12823: [BEAM-10882] Update Snowflake docs

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #12823:
URL: https://github.com/apache/beam/pull/12823#discussion_r489460105



##########
File path: website/www/site/content/en/documentation/io/built-in/snowflake.md
##########
@@ -362,3 +635,206 @@ static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
            };
 }
 {{< /highlight >}}
+## Cross Language

Review comment:
       I changed to `Using SnowflakeIO in Python SDK'` and updated a little intro. 
   
   About the second maybe lets create an issue for this? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org