You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/24 17:31:51 UTC

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11360: [BEAM-9722] added SnowflakeIO with Read operation

aromanenko-dev commented on a change in pull request #11360:
URL: https://github.com/apache/beam/pull/11360#discussion_r414715651



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import static org.apache.beam.sdk.io.TextIO.readFiles;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import java.io.IOException;
+import java.io.Serializable;
+import java.security.PrivateKey;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import javax.sql.DataSource;
+import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.snowflake.credentials.KeyPairSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.OAuthTokenSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.UsernamePasswordSnowflakeCredentials;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Wait;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IO to read and write data on Snowflake.
+ *
+ * <p>SnowflakeIO uses <a href="https://docs.snowflake.net/manuals/user-guide/jdbc.html">Snowflake
+ * JDBC</a> driver under the hood, but data isn't read/written using JDBC directly. Instead,
+ * SnowflakeIO uses dedicated <b>COPY</b> operations to read/write data from/to Google Cloud
+ * Storage.
+ *
+ * <p>To configure SnowflakeIO to read/write from your Snowflake instance, you have to provide a
+ * {@link DataSourceConfiguration} using {@link
+ * DataSourceConfiguration#create(SnowflakeCredentials)}, where {@link SnowflakeCredentials might be
+ * created using {@link org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory}}.
+ * Additionally one of {@link DataSourceConfiguration#withServerName(String)} or {@link
+ * DataSourceConfiguration#withUrl(String)} must be used to tell SnowflakeIO which instance to use.
+ * <br>
+ * There are also other options available to configure connection to Snowflake:
+ *
+ * <ul>
+ *   <li>{@link DataSourceConfiguration#withWarehouse(String)} to specify which Warehouse to use
+ *   <li>{@link DataSourceConfiguration#withDatabase(String)} to specify which Database to connect
+ *       to
+ *   <li>{@link DataSourceConfiguration#withSchema(String)} to specify which schema to use
+ *   <li>{@link DataSourceConfiguration#withRole(String)} to specify which role to use
+ *   <li>{@link DataSourceConfiguration#withLoginTimeout(Integer)} to specify the timeout for the
+ *       login
+ *   <li>{@link DataSourceConfiguration#withPortNumber(Integer)} to specify custom port of Snowflake
+ *       instance
+ * </ul>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
+ *     SnowflakeIO.DataSourceConfiguration.create(SnowflakeCredentialsFactory.of(options))
+ *         .withServerName(options.getServerName())
+ *         .withWarehouse(options.getWarehouse())
+ *         .withDatabase(options.getDatabase())
+ *         .withSchema(options.getSchema());
+ * }</pre>
+ *
+ * <h3>Reading from Snowflake</h3>
+ *
+ * <p>SnowflakeIO.Read returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is
+ * the type returned by the provided {@link CsvMapper}.
+ *
+ * <p>For example
+ *
+ * <pre>{@code
+ * PCollection<GenericRecord> items = pipeline.apply(
+ *  SnowflakeIO.<GenericRecord>read()
+ *    .withDataSourceConfiguration(dataSourceConfiguration)
+ *    .fromQuery(QUERY)
+ *    .withStagingBucketName(stagingBucketName)
+ *    .withIntegrationName(integrationName)
+ *    .withCsvMapper(...)
+ *    .withCoder(...));
+ * }</pre>
+ */
+public class SnowflakeIO {
+  private static final Logger LOG = LoggerFactory.getLogger(SnowflakeIO.class);
+
+  private static final String CSV_QUOTE_CHAR = "'";
+
+  /**
+   * Read data from Snowflake via COPY statement via user-defined {@link SnowflakeService}.
+   *
+   * @param snowflakeService user-defined {@link SnowflakeService}
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read(
+      SnowflakeService snowflakeService, SnowflakeCloudProvider snowflakeCloudProvider) {
+    return new AutoValue_SnowflakeIO_Read.Builder<T>()
+        .setSnowflakeService(snowflakeService)
+        .setSnowflakeCloudProvider(snowflakeCloudProvider)
+        .build();
+  }
+
+  /**
+   * Read data from Snowflake via COPY statement via default {@link SnowflakeServiceImpl}.
+   *
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read() {
+    return read(new SnowflakeServiceImpl(), new GCSProvider());
+  }
+
+  /**
+   * Interface for user-defined function mapping parts of CSV line into T. Used for
+   * SnowflakeIO.Read.
+   *
+   * @param <T> Type of data to be read.
+   */
+  @FunctionalInterface
+  public interface CsvMapper<T> extends Serializable {
+    T mapRow(String[] parts) throws Exception;
+  }
+
+  /** Implementation of {@link #read()}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getIntegrationName();
+
+    @Nullable
+    abstract String getStagingBucketName();
+
+    @Nullable
+    abstract CsvMapper<T> getCsvMapper();
+
+    @Nullable
+    abstract Coder<T> getCoder();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    @Nullable
+    abstract SnowflakeCloudProvider getSnowflakeCloudProvider();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setIntegrationName(String integrationName);
+
+      abstract Builder<T> setStagingBucketName(String stagingBucketName);
+
+      abstract Builder<T> setCsvMapper(CsvMapper<T> csvMapper);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Builder<T> setSnowflakeCloudProvider(SnowflakeCloudProvider snowflakeCloudProvider);
+
+      abstract Read<T> build();
+    }
+
+    public Read<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+
+      try {
+        Connection connection = config.buildDatasource().getConnection();

Review comment:
       Does it establish a connection over a network? 

##########
File path: CHANGES.md
##########
@@ -28,7 +28,8 @@
 
 ## I/Os
 
-* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Support for reading from Snowflake added (Java) ([BEAM-9722](https://issues.apache.org/jira/browse/BEAM-9722)).

Review comment:
       Please, move it to `[2.22.0] - Unreleased` section (I guess you need to rebase to have it).

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import static org.apache.beam.sdk.io.TextIO.readFiles;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import java.io.IOException;
+import java.io.Serializable;
+import java.security.PrivateKey;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import javax.sql.DataSource;
+import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.snowflake.credentials.KeyPairSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.OAuthTokenSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.UsernamePasswordSnowflakeCredentials;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Wait;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IO to read and write data on Snowflake.
+ *
+ * <p>SnowflakeIO uses <a href="https://docs.snowflake.net/manuals/user-guide/jdbc.html">Snowflake
+ * JDBC</a> driver under the hood, but data isn't read/written using JDBC directly. Instead,
+ * SnowflakeIO uses dedicated <b>COPY</b> operations to read/write data from/to Google Cloud
+ * Storage.
+ *
+ * <p>To configure SnowflakeIO to read/write from your Snowflake instance, you have to provide a
+ * {@link DataSourceConfiguration} using {@link
+ * DataSourceConfiguration#create(SnowflakeCredentials)}, where {@link SnowflakeCredentials might be
+ * created using {@link org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory}}.
+ * Additionally one of {@link DataSourceConfiguration#withServerName(String)} or {@link
+ * DataSourceConfiguration#withUrl(String)} must be used to tell SnowflakeIO which instance to use.
+ * <br>
+ * There are also other options available to configure connection to Snowflake:
+ *
+ * <ul>
+ *   <li>{@link DataSourceConfiguration#withWarehouse(String)} to specify which Warehouse to use
+ *   <li>{@link DataSourceConfiguration#withDatabase(String)} to specify which Database to connect
+ *       to
+ *   <li>{@link DataSourceConfiguration#withSchema(String)} to specify which schema to use
+ *   <li>{@link DataSourceConfiguration#withRole(String)} to specify which role to use
+ *   <li>{@link DataSourceConfiguration#withLoginTimeout(Integer)} to specify the timeout for the
+ *       login
+ *   <li>{@link DataSourceConfiguration#withPortNumber(Integer)} to specify custom port of Snowflake
+ *       instance
+ * </ul>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
+ *     SnowflakeIO.DataSourceConfiguration.create(SnowflakeCredentialsFactory.of(options))
+ *         .withServerName(options.getServerName())
+ *         .withWarehouse(options.getWarehouse())
+ *         .withDatabase(options.getDatabase())
+ *         .withSchema(options.getSchema());
+ * }</pre>
+ *
+ * <h3>Reading from Snowflake</h3>
+ *
+ * <p>SnowflakeIO.Read returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is
+ * the type returned by the provided {@link CsvMapper}.
+ *
+ * <p>For example
+ *
+ * <pre>{@code
+ * PCollection<GenericRecord> items = pipeline.apply(
+ *  SnowflakeIO.<GenericRecord>read()
+ *    .withDataSourceConfiguration(dataSourceConfiguration)
+ *    .fromQuery(QUERY)
+ *    .withStagingBucketName(stagingBucketName)
+ *    .withIntegrationName(integrationName)
+ *    .withCsvMapper(...)
+ *    .withCoder(...));
+ * }</pre>
+ */
+public class SnowflakeIO {
+  private static final Logger LOG = LoggerFactory.getLogger(SnowflakeIO.class);
+
+  private static final String CSV_QUOTE_CHAR = "'";
+
+  /**
+   * Read data from Snowflake via COPY statement via user-defined {@link SnowflakeService}.
+   *
+   * @param snowflakeService user-defined {@link SnowflakeService}
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read(
+      SnowflakeService snowflakeService, SnowflakeCloudProvider snowflakeCloudProvider) {
+    return new AutoValue_SnowflakeIO_Read.Builder<T>()
+        .setSnowflakeService(snowflakeService)
+        .setSnowflakeCloudProvider(snowflakeCloudProvider)
+        .build();
+  }
+
+  /**
+   * Read data from Snowflake via COPY statement via default {@link SnowflakeServiceImpl}.
+   *
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read() {
+    return read(new SnowflakeServiceImpl(), new GCSProvider());
+  }
+
+  /**
+   * Interface for user-defined function mapping parts of CSV line into T. Used for
+   * SnowflakeIO.Read.
+   *
+   * @param <T> Type of data to be read.
+   */
+  @FunctionalInterface
+  public interface CsvMapper<T> extends Serializable {
+    T mapRow(String[] parts) throws Exception;
+  }
+
+  /** Implementation of {@link #read()}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getIntegrationName();
+
+    @Nullable
+    abstract String getStagingBucketName();
+
+    @Nullable
+    abstract CsvMapper<T> getCsvMapper();
+
+    @Nullable
+    abstract Coder<T> getCoder();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    @Nullable
+    abstract SnowflakeCloudProvider getSnowflakeCloudProvider();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setIntegrationName(String integrationName);
+
+      abstract Builder<T> setStagingBucketName(String stagingBucketName);
+
+      abstract Builder<T> setCsvMapper(CsvMapper<T> csvMapper);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Builder<T> setSnowflakeCloudProvider(SnowflakeCloudProvider snowflakeCloudProvider);
+
+      abstract Read<T> build();
+    }
+
+    public Read<T> withDataSourceConfiguration(final DataSourceConfiguration config) {

Review comment:
       Please, add a Javadoc for all "with..." exposed methods, since it's a part of public API of this IO

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import static org.apache.beam.sdk.io.TextIO.readFiles;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import java.io.IOException;
+import java.io.Serializable;
+import java.security.PrivateKey;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import javax.sql.DataSource;
+import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.snowflake.credentials.KeyPairSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.OAuthTokenSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.UsernamePasswordSnowflakeCredentials;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Wait;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IO to read and write data on Snowflake.
+ *
+ * <p>SnowflakeIO uses <a href="https://docs.snowflake.net/manuals/user-guide/jdbc.html">Snowflake
+ * JDBC</a> driver under the hood, but data isn't read/written using JDBC directly. Instead,
+ * SnowflakeIO uses dedicated <b>COPY</b> operations to read/write data from/to Google Cloud
+ * Storage.
+ *
+ * <p>To configure SnowflakeIO to read/write from your Snowflake instance, you have to provide a
+ * {@link DataSourceConfiguration} using {@link
+ * DataSourceConfiguration#create(SnowflakeCredentials)}, where {@link SnowflakeCredentials might be
+ * created using {@link org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory}}.
+ * Additionally one of {@link DataSourceConfiguration#withServerName(String)} or {@link
+ * DataSourceConfiguration#withUrl(String)} must be used to tell SnowflakeIO which instance to use.
+ * <br>
+ * There are also other options available to configure connection to Snowflake:
+ *
+ * <ul>
+ *   <li>{@link DataSourceConfiguration#withWarehouse(String)} to specify which Warehouse to use
+ *   <li>{@link DataSourceConfiguration#withDatabase(String)} to specify which Database to connect
+ *       to
+ *   <li>{@link DataSourceConfiguration#withSchema(String)} to specify which schema to use
+ *   <li>{@link DataSourceConfiguration#withRole(String)} to specify which role to use
+ *   <li>{@link DataSourceConfiguration#withLoginTimeout(Integer)} to specify the timeout for the
+ *       login
+ *   <li>{@link DataSourceConfiguration#withPortNumber(Integer)} to specify custom port of Snowflake
+ *       instance
+ * </ul>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
+ *     SnowflakeIO.DataSourceConfiguration.create(SnowflakeCredentialsFactory.of(options))
+ *         .withServerName(options.getServerName())
+ *         .withWarehouse(options.getWarehouse())
+ *         .withDatabase(options.getDatabase())
+ *         .withSchema(options.getSchema());
+ * }</pre>
+ *
+ * <h3>Reading from Snowflake</h3>
+ *
+ * <p>SnowflakeIO.Read returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is
+ * the type returned by the provided {@link CsvMapper}.
+ *
+ * <p>For example
+ *
+ * <pre>{@code
+ * PCollection<GenericRecord> items = pipeline.apply(
+ *  SnowflakeIO.<GenericRecord>read()
+ *    .withDataSourceConfiguration(dataSourceConfiguration)
+ *    .fromQuery(QUERY)
+ *    .withStagingBucketName(stagingBucketName)
+ *    .withIntegrationName(integrationName)
+ *    .withCsvMapper(...)
+ *    .withCoder(...));
+ * }</pre>
+ */
+public class SnowflakeIO {
+  private static final Logger LOG = LoggerFactory.getLogger(SnowflakeIO.class);
+
+  private static final String CSV_QUOTE_CHAR = "'";
+
+  /**
+   * Read data from Snowflake via COPY statement via user-defined {@link SnowflakeService}.
+   *
+   * @param snowflakeService user-defined {@link SnowflakeService}
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read(
+      SnowflakeService snowflakeService, SnowflakeCloudProvider snowflakeCloudProvider) {
+    return new AutoValue_SnowflakeIO_Read.Builder<T>()
+        .setSnowflakeService(snowflakeService)
+        .setSnowflakeCloudProvider(snowflakeCloudProvider)
+        .build();
+  }
+
+  /**
+   * Read data from Snowflake via COPY statement via default {@link SnowflakeServiceImpl}.
+   *
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read() {
+    return read(new SnowflakeServiceImpl(), new GCSProvider());
+  }
+
+  /**
+   * Interface for user-defined function mapping parts of CSV line into T. Used for
+   * SnowflakeIO.Read.
+   *
+   * @param <T> Type of data to be read.
+   */
+  @FunctionalInterface
+  public interface CsvMapper<T> extends Serializable {
+    T mapRow(String[] parts) throws Exception;
+  }
+
+  /** Implementation of {@link #read()}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getIntegrationName();
+
+    @Nullable
+    abstract String getStagingBucketName();
+
+    @Nullable
+    abstract CsvMapper<T> getCsvMapper();
+
+    @Nullable
+    abstract Coder<T> getCoder();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    @Nullable
+    abstract SnowflakeCloudProvider getSnowflakeCloudProvider();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setIntegrationName(String integrationName);
+
+      abstract Builder<T> setStagingBucketName(String stagingBucketName);
+
+      abstract Builder<T> setCsvMapper(CsvMapper<T> csvMapper);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Builder<T> setSnowflakeCloudProvider(SnowflakeCloudProvider snowflakeCloudProvider);
+
+      abstract Read<T> build();
+    }
+
+    public Read<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+
+      try {
+        Connection connection = config.buildDatasource().getConnection();
+        connection.close();
+      } catch (SQLException e) {
+        throw new IllegalArgumentException(
+            "Invalid DataSourceConfiguration. Underlying cause: " + e);
+      }
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    public Read<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    public Read<T> fromQuery(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    public Read<T> fromTable(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    public Read<T> withStagingBucketName(String stagingBucketName) {
+      return toBuilder().setStagingBucketName(stagingBucketName).build();
+    }
+
+    public Read<T> withIntegrationName(String integrationName) {
+      return toBuilder().setIntegrationName(integrationName).build();
+    }
+
+    public Read<T> withCsvMapper(CsvMapper<T> csvMapper) {
+      return toBuilder().setCsvMapper(csvMapper).build();
+    }
+
+    public Read<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      // Either table or query is required. If query is present, it's being used, table is used
+      // otherwise
+      checkArgument(
+          getQuery() != null || getTable() != null, "fromTable() or fromQuery() is required");
+      checkArgument(
+          !(getQuery() != null && getTable() != null),
+          "fromTable() and fromQuery() is not allowed together");
+      checkArgument(getCsvMapper() != null, "withCsvMapper() is required");
+      checkArgument(getCoder() != null, "withCoder() is required");
+      checkArgument(getIntegrationName() != null, "withIntegrationName() is required");
+      checkArgument(getStagingBucketName() != null, "withStagingBucketName() is required");
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      String gcpTmpDirName = makeTmpDirName();
+      PCollection<Void> emptyCollection = input.apply(Create.of((Void) null));
+
+      PCollection<T> output =
+          emptyCollection
+              .apply(
+                  ParDo.of(
+                      new CopyIntoStageFn(
+                          getDataSourceProviderFn(),
+                          getQuery(),
+                          getTable(),
+                          getIntegrationName(),
+                          getStagingBucketName(),
+                          gcpTmpDirName,
+                          getSnowflakeService())))
+              .apply(FileIO.matchAll())
+              .apply(FileIO.readMatches())
+              .apply(readFiles())
+              .apply(ParDo.of(new MapCsvToStringArrayFn()))
+              .apply(ParDo.of(new MapStringArrayToUserDataFn<>(getCsvMapper())));
+
+      output.setCoder(getCoder());
+
+      emptyCollection
+          .apply(Wait.on(output))
+          .apply(
+              ParDo.of(
+                  new CleanTmpFilesFromGcsFn(

Review comment:
       Will the temp directory will be cleaned if pipeline was failed before?

##########
File path: sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/package-info.java
##########
@@ -0,0 +1,2 @@
+/** Snowflake IO tests. */

Review comment:
       Add ASF license header

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import static org.apache.beam.sdk.io.TextIO.readFiles;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import java.io.IOException;
+import java.io.Serializable;
+import java.security.PrivateKey;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import javax.sql.DataSource;
+import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.snowflake.credentials.KeyPairSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.OAuthTokenSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.UsernamePasswordSnowflakeCredentials;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Wait;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IO to read and write data on Snowflake.
+ *
+ * <p>SnowflakeIO uses <a href="https://docs.snowflake.net/manuals/user-guide/jdbc.html">Snowflake
+ * JDBC</a> driver under the hood, but data isn't read/written using JDBC directly. Instead,
+ * SnowflakeIO uses dedicated <b>COPY</b> operations to read/write data from/to Google Cloud
+ * Storage.
+ *
+ * <p>To configure SnowflakeIO to read/write from your Snowflake instance, you have to provide a
+ * {@link DataSourceConfiguration} using {@link
+ * DataSourceConfiguration#create(SnowflakeCredentials)}, where {@link SnowflakeCredentials might be
+ * created using {@link org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory}}.
+ * Additionally one of {@link DataSourceConfiguration#withServerName(String)} or {@link
+ * DataSourceConfiguration#withUrl(String)} must be used to tell SnowflakeIO which instance to use.
+ * <br>
+ * There are also other options available to configure connection to Snowflake:
+ *
+ * <ul>
+ *   <li>{@link DataSourceConfiguration#withWarehouse(String)} to specify which Warehouse to use
+ *   <li>{@link DataSourceConfiguration#withDatabase(String)} to specify which Database to connect
+ *       to
+ *   <li>{@link DataSourceConfiguration#withSchema(String)} to specify which schema to use
+ *   <li>{@link DataSourceConfiguration#withRole(String)} to specify which role to use
+ *   <li>{@link DataSourceConfiguration#withLoginTimeout(Integer)} to specify the timeout for the
+ *       login
+ *   <li>{@link DataSourceConfiguration#withPortNumber(Integer)} to specify custom port of Snowflake
+ *       instance
+ * </ul>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
+ *     SnowflakeIO.DataSourceConfiguration.create(SnowflakeCredentialsFactory.of(options))
+ *         .withServerName(options.getServerName())
+ *         .withWarehouse(options.getWarehouse())
+ *         .withDatabase(options.getDatabase())
+ *         .withSchema(options.getSchema());
+ * }</pre>
+ *
+ * <h3>Reading from Snowflake</h3>
+ *
+ * <p>SnowflakeIO.Read returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is
+ * the type returned by the provided {@link CsvMapper}.
+ *
+ * <p>For example
+ *
+ * <pre>{@code
+ * PCollection<GenericRecord> items = pipeline.apply(
+ *  SnowflakeIO.<GenericRecord>read()
+ *    .withDataSourceConfiguration(dataSourceConfiguration)
+ *    .fromQuery(QUERY)
+ *    .withStagingBucketName(stagingBucketName)
+ *    .withIntegrationName(integrationName)
+ *    .withCsvMapper(...)
+ *    .withCoder(...));
+ * }</pre>
+ */
+public class SnowflakeIO {
+  private static final Logger LOG = LoggerFactory.getLogger(SnowflakeIO.class);
+
+  private static final String CSV_QUOTE_CHAR = "'";
+
+  /**
+   * Read data from Snowflake via COPY statement via user-defined {@link SnowflakeService}.
+   *
+   * @param snowflakeService user-defined {@link SnowflakeService}
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read(
+      SnowflakeService snowflakeService, SnowflakeCloudProvider snowflakeCloudProvider) {
+    return new AutoValue_SnowflakeIO_Read.Builder<T>()
+        .setSnowflakeService(snowflakeService)
+        .setSnowflakeCloudProvider(snowflakeCloudProvider)
+        .build();
+  }
+
+  /**
+   * Read data from Snowflake via COPY statement via default {@link SnowflakeServiceImpl}.
+   *
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read() {
+    return read(new SnowflakeServiceImpl(), new GCSProvider());
+  }
+
+  /**
+   * Interface for user-defined function mapping parts of CSV line into T. Used for
+   * SnowflakeIO.Read.
+   *
+   * @param <T> Type of data to be read.
+   */
+  @FunctionalInterface
+  public interface CsvMapper<T> extends Serializable {
+    T mapRow(String[] parts) throws Exception;
+  }
+
+  /** Implementation of {@link #read()}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getIntegrationName();
+
+    @Nullable
+    abstract String getStagingBucketName();
+
+    @Nullable
+    abstract CsvMapper<T> getCsvMapper();
+
+    @Nullable
+    abstract Coder<T> getCoder();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    @Nullable
+    abstract SnowflakeCloudProvider getSnowflakeCloudProvider();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setIntegrationName(String integrationName);
+
+      abstract Builder<T> setStagingBucketName(String stagingBucketName);
+
+      abstract Builder<T> setCsvMapper(CsvMapper<T> csvMapper);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Builder<T> setSnowflakeCloudProvider(SnowflakeCloudProvider snowflakeCloudProvider);
+
+      abstract Read<T> build();
+    }
+
+    public Read<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+
+      try {
+        Connection connection = config.buildDatasource().getConnection();
+        connection.close();
+      } catch (SQLException e) {
+        throw new IllegalArgumentException(
+            "Invalid DataSourceConfiguration. Underlying cause: " + e);
+      }
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    public Read<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    public Read<T> fromQuery(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    public Read<T> fromTable(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    public Read<T> withStagingBucketName(String stagingBucketName) {
+      return toBuilder().setStagingBucketName(stagingBucketName).build();
+    }
+
+    public Read<T> withIntegrationName(String integrationName) {
+      return toBuilder().setIntegrationName(integrationName).build();
+    }
+
+    public Read<T> withCsvMapper(CsvMapper<T> csvMapper) {
+      return toBuilder().setCsvMapper(csvMapper).build();
+    }
+
+    public Read<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      // Either table or query is required. If query is present, it's being used, table is used
+      // otherwise
+      checkArgument(
+          getQuery() != null || getTable() != null, "fromTable() or fromQuery() is required");
+      checkArgument(
+          !(getQuery() != null && getTable() != null),
+          "fromTable() and fromQuery() is not allowed together");

Review comment:
       nit "**are** not allowed"

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/package-info.java
##########
@@ -0,0 +1,2 @@
+/** Credentials for SnowflakeIO. */

Review comment:
       Add ASF license header

##########
File path: sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeDatabase.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.test;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import net.snowflake.client.jdbc.SnowflakeSQLException;
+
+/** Fake implementation of SnowFlake warehouse used in test code. */
+public class FakeSnowflakeDatabase implements Serializable {

Review comment:
       Are there any mock/fake already implemented snowflake databases that can be used for testing?

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import static org.apache.beam.sdk.io.TextIO.readFiles;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import java.io.IOException;
+import java.io.Serializable;
+import java.security.PrivateKey;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import javax.sql.DataSource;
+import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.snowflake.credentials.KeyPairSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.OAuthTokenSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.UsernamePasswordSnowflakeCredentials;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Wait;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IO to read and write data on Snowflake.
+ *
+ * <p>SnowflakeIO uses <a href="https://docs.snowflake.net/manuals/user-guide/jdbc.html">Snowflake
+ * JDBC</a> driver under the hood, but data isn't read/written using JDBC directly. Instead,
+ * SnowflakeIO uses dedicated <b>COPY</b> operations to read/write data from/to Google Cloud
+ * Storage.
+ *
+ * <p>To configure SnowflakeIO to read/write from your Snowflake instance, you have to provide a
+ * {@link DataSourceConfiguration} using {@link
+ * DataSourceConfiguration#create(SnowflakeCredentials)}, where {@link SnowflakeCredentials might be
+ * created using {@link org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory}}.
+ * Additionally one of {@link DataSourceConfiguration#withServerName(String)} or {@link
+ * DataSourceConfiguration#withUrl(String)} must be used to tell SnowflakeIO which instance to use.
+ * <br>
+ * There are also other options available to configure connection to Snowflake:
+ *
+ * <ul>
+ *   <li>{@link DataSourceConfiguration#withWarehouse(String)} to specify which Warehouse to use
+ *   <li>{@link DataSourceConfiguration#withDatabase(String)} to specify which Database to connect
+ *       to
+ *   <li>{@link DataSourceConfiguration#withSchema(String)} to specify which schema to use
+ *   <li>{@link DataSourceConfiguration#withRole(String)} to specify which role to use
+ *   <li>{@link DataSourceConfiguration#withLoginTimeout(Integer)} to specify the timeout for the
+ *       login
+ *   <li>{@link DataSourceConfiguration#withPortNumber(Integer)} to specify custom port of Snowflake
+ *       instance
+ * </ul>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
+ *     SnowflakeIO.DataSourceConfiguration.create(SnowflakeCredentialsFactory.of(options))
+ *         .withServerName(options.getServerName())
+ *         .withWarehouse(options.getWarehouse())
+ *         .withDatabase(options.getDatabase())
+ *         .withSchema(options.getSchema());
+ * }</pre>
+ *
+ * <h3>Reading from Snowflake</h3>
+ *
+ * <p>SnowflakeIO.Read returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is
+ * the type returned by the provided {@link CsvMapper}.
+ *
+ * <p>For example
+ *
+ * <pre>{@code
+ * PCollection<GenericRecord> items = pipeline.apply(
+ *  SnowflakeIO.<GenericRecord>read()
+ *    .withDataSourceConfiguration(dataSourceConfiguration)
+ *    .fromQuery(QUERY)
+ *    .withStagingBucketName(stagingBucketName)
+ *    .withIntegrationName(integrationName)
+ *    .withCsvMapper(...)
+ *    .withCoder(...));
+ * }</pre>
+ */
+public class SnowflakeIO {
+  private static final Logger LOG = LoggerFactory.getLogger(SnowflakeIO.class);
+
+  private static final String CSV_QUOTE_CHAR = "'";
+
+  /**
+   * Read data from Snowflake via COPY statement via user-defined {@link SnowflakeService}.
+   *
+   * @param snowflakeService user-defined {@link SnowflakeService}
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read(
+      SnowflakeService snowflakeService, SnowflakeCloudProvider snowflakeCloudProvider) {
+    return new AutoValue_SnowflakeIO_Read.Builder<T>()
+        .setSnowflakeService(snowflakeService)
+        .setSnowflakeCloudProvider(snowflakeCloudProvider)
+        .build();
+  }
+
+  /**
+   * Read data from Snowflake via COPY statement via default {@link SnowflakeServiceImpl}.
+   *
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read() {
+    return read(new SnowflakeServiceImpl(), new GCSProvider());
+  }
+
+  /**
+   * Interface for user-defined function mapping parts of CSV line into T. Used for
+   * SnowflakeIO.Read.
+   *
+   * @param <T> Type of data to be read.
+   */
+  @FunctionalInterface
+  public interface CsvMapper<T> extends Serializable {
+    T mapRow(String[] parts) throws Exception;
+  }
+
+  /** Implementation of {@link #read()}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getIntegrationName();
+
+    @Nullable
+    abstract String getStagingBucketName();
+
+    @Nullable
+    abstract CsvMapper<T> getCsvMapper();
+
+    @Nullable
+    abstract Coder<T> getCoder();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    @Nullable
+    abstract SnowflakeCloudProvider getSnowflakeCloudProvider();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setIntegrationName(String integrationName);
+
+      abstract Builder<T> setStagingBucketName(String stagingBucketName);
+
+      abstract Builder<T> setCsvMapper(CsvMapper<T> csvMapper);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Builder<T> setSnowflakeCloudProvider(SnowflakeCloudProvider snowflakeCloudProvider);
+
+      abstract Read<T> build();
+    }
+
+    public Read<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+
+      try {
+        Connection connection = config.buildDatasource().getConnection();
+        connection.close();
+      } catch (SQLException e) {
+        throw new IllegalArgumentException(
+            "Invalid DataSourceConfiguration. Underlying cause: " + e);
+      }
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    public Read<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    public Read<T> fromQuery(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    public Read<T> fromTable(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    public Read<T> withStagingBucketName(String stagingBucketName) {
+      return toBuilder().setStagingBucketName(stagingBucketName).build();
+    }
+
+    public Read<T> withIntegrationName(String integrationName) {
+      return toBuilder().setIntegrationName(integrationName).build();
+    }
+
+    public Read<T> withCsvMapper(CsvMapper<T> csvMapper) {
+      return toBuilder().setCsvMapper(csvMapper).build();
+    }
+
+    public Read<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      // Either table or query is required. If query is present, it's being used, table is used
+      // otherwise
+      checkArgument(
+          getQuery() != null || getTable() != null, "fromTable() or fromQuery() is required");
+      checkArgument(
+          !(getQuery() != null && getTable() != null),
+          "fromTable() and fromQuery() is not allowed together");
+      checkArgument(getCsvMapper() != null, "withCsvMapper() is required");
+      checkArgument(getCoder() != null, "withCoder() is required");
+      checkArgument(getIntegrationName() != null, "withIntegrationName() is required");
+      checkArgument(getStagingBucketName() != null, "withStagingBucketName() is required");
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      String gcpTmpDirName = makeTmpDirName();

Review comment:
       I believe, in general, COPY INTO can use the different external locations, like Amazon S3, Google Cloud Storage, or Microsoft Azure. So, please make it configurable for users even if this IO now  supports only GCS.

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/package-info.java
##########
@@ -0,0 +1,2 @@
+/** Snowflake IO transforms. */

Review comment:
       Add ASF license header

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeServiceImpl.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.function.Consumer;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+/**
+ * Implemenation of {@link org.apache.beam.sdk.io.snowflake.SnowflakeService} used in production.
+ */
+public class SnowflakeServiceImpl implements SnowflakeService {
+
+  @Override
+  public String copyIntoStage(
+      SerializableFunction<Void, DataSource> dataSourceProviderFn,
+      String query,
+      String table,
+      String integrationName,
+      String stagingBucketName,
+      String tmpDirName)
+      throws SQLException {
+
+    String from;
+    if (query != null) {
+      // Query must be surrounded with brackets
+      from = String.format("(%s)", query);
+    } else {
+      from = table;
+    }
+
+    String externalLocation = String.format("gcs://%s/%s/", stagingBucketName, tmpDirName);

Review comment:
       Is it GC specific implementation? I think we need to make it configurable which type of external location to use. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org