You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/16 21:21:15 UTC

[GitHub] [beam] chamikaramj commented on a change in pull request #11360: [BEAM-9722] added SnowflakeIO with Read operation

chamikaramj commented on a change in pull request #11360:
URL: https://github.com/apache/beam/pull/11360#discussion_r426190761



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import static org.apache.beam.sdk.io.TextIO.readFiles;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import java.io.IOException;
+import java.io.Serializable;
+import java.security.PrivateKey;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.sql.DataSource;
+import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.snowflake.credentials.KeyPairSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.OAuthTokenSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.UsernamePasswordSnowflakeCredentials;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Wait;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IO to read and write data on Snowflake.
+ *
+ * <p>SnowflakeIO uses <a href="https://docs.snowflake.net/manuals/user-guide/jdbc.html">Snowflake
+ * JDBC</a> driver under the hood, but data isn't read/written using JDBC directly. Instead,
+ * SnowflakeIO uses dedicated <b>COPY</b> operations to read/write data from/to a cloud bucket. By
+ * now only Google Cloud Storage is supported.
+ *
+ * <p>To configure SnowflakeIO to read/write from your Snowflake instance, you have to provide a
+ * {@link DataSourceConfiguration} using {@link
+ * DataSourceConfiguration#create(SnowflakeCredentials)}, where {@link SnowflakeCredentials might be
+ * created using {@link org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory}}.
+ * Additionally one of {@link DataSourceConfiguration#withServerName(String)} or {@link
+ * DataSourceConfiguration#withUrl(String)} must be used to tell SnowflakeIO which instance to use.
+ * <br>
+ * There are also other options available to configure connection to Snowflake:
+ *
+ * <ul>
+ *   <li>{@link DataSourceConfiguration#withWarehouse(String)} to specify which Warehouse to use
+ *   <li>{@link DataSourceConfiguration#withDatabase(String)} to specify which Database to connect
+ *       to
+ *   <li>{@link DataSourceConfiguration#withSchema(String)} to specify which schema to use
+ *   <li>{@link DataSourceConfiguration#withRole(String)} to specify which role to use
+ *   <li>{@link DataSourceConfiguration#withLoginTimeout(Integer)} to specify the timeout for the
+ *       login
+ *   <li>{@link DataSourceConfiguration#withPortNumber(Integer)} to specify custom port of Snowflake
+ *       instance
+ * </ul>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
+ *     SnowflakeIO.DataSourceConfiguration.create(SnowflakeCredentialsFactory.of(options))
+ *         .withServerName(options.getServerName())
+ *         .withWarehouse(options.getWarehouse())
+ *         .withDatabase(options.getDatabase())
+ *         .withSchema(options.getSchema());
+ * }</pre>
+ *
+ * <h3>Reading from Snowflake</h3>
+ *
+ * <p>SnowflakeIO.Read returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is
+ * the type returned by the provided {@link CsvMapper}.
+ *
+ * <p>For example
+ *
+ * <pre>{@code
+ * PCollection<GenericRecord> items = pipeline.apply(
+ *  SnowflakeIO.<GenericRecord>read()
+ *    .withDataSourceConfiguration(dataSourceConfiguration)
+ *    .fromQuery(QUERY)
+ *    .withStagingBucketName(stagingBucketName)
+ *    .withIntegrationName(integrationName)
+ *    .withCsvMapper(...)
+ *    .withCoder(...));
+ * }</pre>
+ *
+ * <p><b>Important</b> When reading data from Snowflake, temporary CSV files are created on the
+ * specified stagingBucketName in directory named `sf_copy_csv_[RANDOM CHARS]_[TIMESTAMP]`. This
+ * directory and all the files are deleted automatically by default, but in case of failed pipeline

Review comment:
       s/deleted/cleaned up

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import static org.apache.beam.sdk.io.TextIO.readFiles;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import java.io.IOException;
+import java.io.Serializable;
+import java.security.PrivateKey;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.sql.DataSource;
+import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.snowflake.credentials.KeyPairSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.OAuthTokenSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.UsernamePasswordSnowflakeCredentials;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Wait;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IO to read and write data on Snowflake.
+ *
+ * <p>SnowflakeIO uses <a href="https://docs.snowflake.net/manuals/user-guide/jdbc.html">Snowflake
+ * JDBC</a> driver under the hood, but data isn't read/written using JDBC directly. Instead,
+ * SnowflakeIO uses dedicated <b>COPY</b> operations to read/write data from/to a cloud bucket. By
+ * now only Google Cloud Storage is supported.
+ *
+ * <p>To configure SnowflakeIO to read/write from your Snowflake instance, you have to provide a
+ * {@link DataSourceConfiguration} using {@link
+ * DataSourceConfiguration#create(SnowflakeCredentials)}, where {@link SnowflakeCredentials might be
+ * created using {@link org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory}}.
+ * Additionally one of {@link DataSourceConfiguration#withServerName(String)} or {@link
+ * DataSourceConfiguration#withUrl(String)} must be used to tell SnowflakeIO which instance to use.
+ * <br>
+ * There are also other options available to configure connection to Snowflake:
+ *
+ * <ul>
+ *   <li>{@link DataSourceConfiguration#withWarehouse(String)} to specify which Warehouse to use
+ *   <li>{@link DataSourceConfiguration#withDatabase(String)} to specify which Database to connect
+ *       to
+ *   <li>{@link DataSourceConfiguration#withSchema(String)} to specify which schema to use
+ *   <li>{@link DataSourceConfiguration#withRole(String)} to specify which role to use
+ *   <li>{@link DataSourceConfiguration#withLoginTimeout(Integer)} to specify the timeout for the
+ *       login
+ *   <li>{@link DataSourceConfiguration#withPortNumber(Integer)} to specify custom port of Snowflake
+ *       instance
+ * </ul>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
+ *     SnowflakeIO.DataSourceConfiguration.create(SnowflakeCredentialsFactory.of(options))
+ *         .withServerName(options.getServerName())
+ *         .withWarehouse(options.getWarehouse())
+ *         .withDatabase(options.getDatabase())
+ *         .withSchema(options.getSchema());
+ * }</pre>
+ *
+ * <h3>Reading from Snowflake</h3>
+ *
+ * <p>SnowflakeIO.Read returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is
+ * the type returned by the provided {@link CsvMapper}.
+ *
+ * <p>For example
+ *
+ * <pre>{@code
+ * PCollection<GenericRecord> items = pipeline.apply(
+ *  SnowflakeIO.<GenericRecord>read()
+ *    .withDataSourceConfiguration(dataSourceConfiguration)
+ *    .fromQuery(QUERY)
+ *    .withStagingBucketName(stagingBucketName)
+ *    .withIntegrationName(integrationName)
+ *    .withCsvMapper(...)
+ *    .withCoder(...));
+ * }</pre>
+ *
+ * <p><b>Important</b> When reading data from Snowflake, temporary CSV files are created on the
+ * specified stagingBucketName in directory named `sf_copy_csv_[RANDOM CHARS]_[TIMESTAMP]`. This
+ * directory and all the files are deleted automatically by default, but in case of failed pipeline
+ * they will remain and have to be removed manually.

Review comment:
       they may remain and will have to ...

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import static org.apache.beam.sdk.io.TextIO.readFiles;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import java.io.IOException;
+import java.io.Serializable;
+import java.security.PrivateKey;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.sql.DataSource;
+import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.snowflake.credentials.KeyPairSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.OAuthTokenSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.UsernamePasswordSnowflakeCredentials;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Wait;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IO to read and write data on Snowflake.
+ *
+ * <p>SnowflakeIO uses <a href="https://docs.snowflake.net/manuals/user-guide/jdbc.html">Snowflake
+ * JDBC</a> driver under the hood, but data isn't read/written using JDBC directly. Instead,
+ * SnowflakeIO uses dedicated <b>COPY</b> operations to read/write data from/to a cloud bucket. By
+ * now only Google Cloud Storage is supported.
+ *
+ * <p>To configure SnowflakeIO to read/write from your Snowflake instance, you have to provide a
+ * {@link DataSourceConfiguration} using {@link
+ * DataSourceConfiguration#create(SnowflakeCredentials)}, where {@link SnowflakeCredentials might be
+ * created using {@link org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory}}.
+ * Additionally one of {@link DataSourceConfiguration#withServerName(String)} or {@link
+ * DataSourceConfiguration#withUrl(String)} must be used to tell SnowflakeIO which instance to use.
+ * <br>
+ * There are also other options available to configure connection to Snowflake:
+ *
+ * <ul>
+ *   <li>{@link DataSourceConfiguration#withWarehouse(String)} to specify which Warehouse to use
+ *   <li>{@link DataSourceConfiguration#withDatabase(String)} to specify which Database to connect
+ *       to
+ *   <li>{@link DataSourceConfiguration#withSchema(String)} to specify which schema to use
+ *   <li>{@link DataSourceConfiguration#withRole(String)} to specify which role to use
+ *   <li>{@link DataSourceConfiguration#withLoginTimeout(Integer)} to specify the timeout for the
+ *       login
+ *   <li>{@link DataSourceConfiguration#withPortNumber(Integer)} to specify custom port of Snowflake
+ *       instance
+ * </ul>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
+ *     SnowflakeIO.DataSourceConfiguration.create(SnowflakeCredentialsFactory.of(options))
+ *         .withServerName(options.getServerName())
+ *         .withWarehouse(options.getWarehouse())
+ *         .withDatabase(options.getDatabase())
+ *         .withSchema(options.getSchema());
+ * }</pre>
+ *
+ * <h3>Reading from Snowflake</h3>
+ *
+ * <p>SnowflakeIO.Read returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is
+ * the type returned by the provided {@link CsvMapper}.
+ *
+ * <p>For example
+ *
+ * <pre>{@code
+ * PCollection<GenericRecord> items = pipeline.apply(
+ *  SnowflakeIO.<GenericRecord>read()
+ *    .withDataSourceConfiguration(dataSourceConfiguration)
+ *    .fromQuery(QUERY)
+ *    .withStagingBucketName(stagingBucketName)
+ *    .withIntegrationName(integrationName)
+ *    .withCsvMapper(...)
+ *    .withCoder(...));
+ * }</pre>
+ *
+ * <p><b>Important</b> When reading data from Snowflake, temporary CSV files are created on the
+ * specified stagingBucketName in directory named `sf_copy_csv_[RANDOM CHARS]_[TIMESTAMP]`. This
+ * directory and all the files are deleted automatically by default, but in case of failed pipeline
+ * they will remain and have to be removed manually.
+ */
+public class SnowflakeIO {
+  private static final Logger LOG = LoggerFactory.getLogger(SnowflakeIO.class);
+
+  private static final String CSV_QUOTE_CHAR = "'";
+  /**
+   * Read data from Snowflake.
+   *
+   * @param snowflakeService user-defined {@link SnowflakeService}
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read(SnowflakeService snowflakeService) {
+    return new AutoValue_SnowflakeIO_Read.Builder<T>()
+        .setSnowflakeService(snowflakeService)
+        .build();
+  }
+
+  /**
+   * Read data from Snowflake.
+   *
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read() {
+    return read(new SnowflakeServiceImpl());
+  }
+
+  /**
+   * Interface for user-defined function mapping parts of CSV line into T. Used for
+   * SnowflakeIO.Read.
+   *
+   * @param <T> Type of data to be read.
+   */
+  @FunctionalInterface
+  public interface CsvMapper<T> extends Serializable {
+    T mapRow(String[] parts) throws Exception;
+  }
+
+  /** Implementation of {@link #read()}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getIntegrationName();
+
+    @Nullable
+    abstract String getStagingBucketName();
+
+    @Nullable
+    abstract CsvMapper<T> getCsvMapper();
+
+    @Nullable
+    abstract Coder<T> getCoder();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setIntegrationName(String integrationName);
+
+      abstract Builder<T> setStagingBucketName(String stagingBucketName);
+
+      abstract Builder<T> setCsvMapper(CsvMapper<T> csvMapper);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Read<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Read<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+
+      try {
+        Connection connection = config.buildDatasource().getConnection();
+        connection.close();
+      } catch (SQLException e) {
+        throw new IllegalArgumentException(
+            "Invalid DataSourceConfiguration. Underlying cause: " + e);
+      }
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Read<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Read<T> fromQuery(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A table name to be read in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Read<T> fromTable(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * Name of the cloud bucket (GCS by now) to use as tmp location of CSVs during COPY statement.
+     *
+     * @param stagingBucketName - String with the name of the bucket.
+     */
+    public Read<T> withStagingBucketName(String stagingBucketName) {
+      return toBuilder().setStagingBucketName(stagingBucketName).build();
+    }
+
+    /**
+     * Name of the Storage Integration in Snowflake to be used. See
+     * https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html for
+     * reference.
+     *
+     * @param integrationName - String with the name of the Storage Integration.
+     */
+    public Read<T> withIntegrationName(String integrationName) {
+      return toBuilder().setIntegrationName(integrationName).build();
+    }
+
+    /**
+     * User-defined function mapping CSV lines into user data.
+     *
+     * @param csvMapper - an instance of {@link CsvMapper}.
+     */
+    public Read<T> withCsvMapper(CsvMapper<T> csvMapper) {
+      return toBuilder().setCsvMapper(csvMapper).build();
+    }
+
+    /**
+     * A Coder to be used by the output PCollection generated by the source.
+     *
+     * @param coder - an instance of {@link Coder}.
+     */
+    public Read<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      // Either table or query is required. If query is present, it's being used, table is used
+      // otherwise
+      checkArgument(
+          getQuery() != null || getTable() != null, "fromTable() or fromQuery() is required");
+      checkArgument(
+          !(getQuery() != null && getTable() != null),
+          "fromTable() and fromQuery() are not allowed together");
+      checkArgument(getCsvMapper() != null, "withCsvMapper() is required");
+      checkArgument(getCoder() != null, "withCoder() is required");
+      checkArgument(getIntegrationName() != null, "withIntegrationName() is required");
+      checkArgument(getStagingBucketName() != null, "withStagingBucketName() is required");
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      String gcpTmpDirName = makeTmpDirName();
+      String stagingBucketDir = String.format("%s/%s/", getStagingBucketName(), gcpTmpDirName);
+
+      PCollection<Void> emptyCollection = input.apply(Create.of((Void) null));
+
+      PCollection<T> output =
+          emptyCollection
+              .apply(
+                  ParDo.of(
+                      new CopyIntoStageFn(

Review comment:
       What will happen in case of a failure and a retry of a bundle ?
   
   I think you should add a Reshuffle step after CopyIntoStageFn() step to checkpoint the set of files (supported by most runners).
   
   Also, make sure you have considered what would happen if CopyIntoStageFn() itself fails and retries. For example, BQ source tries to look for an already started export job before starting a new one.

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import static org.apache.beam.sdk.io.TextIO.readFiles;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import java.io.IOException;
+import java.io.Serializable;
+import java.security.PrivateKey;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import javax.sql.DataSource;
+import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.snowflake.credentials.KeyPairSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.OAuthTokenSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.UsernamePasswordSnowflakeCredentials;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Wait;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IO to read and write data on Snowflake.
+ *
+ * <p>SnowflakeIO uses <a href="https://docs.snowflake.net/manuals/user-guide/jdbc.html">Snowflake
+ * JDBC</a> driver under the hood, but data isn't read/written using JDBC directly. Instead,
+ * SnowflakeIO uses dedicated <b>COPY</b> operations to read/write data from/to Google Cloud
+ * Storage.
+ *
+ * <p>To configure SnowflakeIO to read/write from your Snowflake instance, you have to provide a
+ * {@link DataSourceConfiguration} using {@link
+ * DataSourceConfiguration#create(SnowflakeCredentials)}, where {@link SnowflakeCredentials might be
+ * created using {@link org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory}}.
+ * Additionally one of {@link DataSourceConfiguration#withServerName(String)} or {@link
+ * DataSourceConfiguration#withUrl(String)} must be used to tell SnowflakeIO which instance to use.
+ * <br>
+ * There are also other options available to configure connection to Snowflake:
+ *
+ * <ul>
+ *   <li>{@link DataSourceConfiguration#withWarehouse(String)} to specify which Warehouse to use
+ *   <li>{@link DataSourceConfiguration#withDatabase(String)} to specify which Database to connect
+ *       to
+ *   <li>{@link DataSourceConfiguration#withSchema(String)} to specify which schema to use
+ *   <li>{@link DataSourceConfiguration#withRole(String)} to specify which role to use
+ *   <li>{@link DataSourceConfiguration#withLoginTimeout(Integer)} to specify the timeout for the
+ *       login
+ *   <li>{@link DataSourceConfiguration#withPortNumber(Integer)} to specify custom port of Snowflake
+ *       instance
+ * </ul>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
+ *     SnowflakeIO.DataSourceConfiguration.create(SnowflakeCredentialsFactory.of(options))
+ *         .withServerName(options.getServerName())
+ *         .withWarehouse(options.getWarehouse())
+ *         .withDatabase(options.getDatabase())
+ *         .withSchema(options.getSchema());
+ * }</pre>
+ *
+ * <h3>Reading from Snowflake</h3>
+ *
+ * <p>SnowflakeIO.Read returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is
+ * the type returned by the provided {@link CsvMapper}.
+ *
+ * <p>For example
+ *
+ * <pre>{@code
+ * PCollection<GenericRecord> items = pipeline.apply(
+ *  SnowflakeIO.<GenericRecord>read()
+ *    .withDataSourceConfiguration(dataSourceConfiguration)
+ *    .fromQuery(QUERY)
+ *    .withStagingBucketName(stagingBucketName)
+ *    .withIntegrationName(integrationName)
+ *    .withCsvMapper(...)
+ *    .withCoder(...));
+ * }</pre>
+ */
+public class SnowflakeIO {
+  private static final Logger LOG = LoggerFactory.getLogger(SnowflakeIO.class);
+
+  private static final String CSV_QUOTE_CHAR = "'";
+
+  /**
+   * Read data from Snowflake via COPY statement via user-defined {@link SnowflakeService}.
+   *
+   * @param snowflakeService user-defined {@link SnowflakeService}
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read(
+      SnowflakeService snowflakeService, SnowflakeCloudProvider snowflakeCloudProvider) {
+    return new AutoValue_SnowflakeIO_Read.Builder<T>()
+        .setSnowflakeService(snowflakeService)
+        .setSnowflakeCloudProvider(snowflakeCloudProvider)
+        .build();
+  }
+
+  /**
+   * Read data from Snowflake via COPY statement via default {@link SnowflakeServiceImpl}.
+   *
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read() {
+    return read(new SnowflakeServiceImpl(), new GCSProvider());
+  }
+
+  /**
+   * Interface for user-defined function mapping parts of CSV line into T. Used for
+   * SnowflakeIO.Read.
+   *
+   * @param <T> Type of data to be read.
+   */
+  @FunctionalInterface
+  public interface CsvMapper<T> extends Serializable {
+    T mapRow(String[] parts) throws Exception;
+  }
+
+  /** Implementation of {@link #read()}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getIntegrationName();
+
+    @Nullable
+    abstract String getStagingBucketName();
+
+    @Nullable
+    abstract CsvMapper<T> getCsvMapper();
+
+    @Nullable
+    abstract Coder<T> getCoder();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    @Nullable
+    abstract SnowflakeCloudProvider getSnowflakeCloudProvider();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setIntegrationName(String integrationName);
+
+      abstract Builder<T> setStagingBucketName(String stagingBucketName);
+
+      abstract Builder<T> setCsvMapper(CsvMapper<T> csvMapper);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Builder<T> setSnowflakeCloudProvider(SnowflakeCloudProvider snowflakeCloudProvider);
+
+      abstract Read<T> build();
+    }
+
+    public Read<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+
+      try {
+        Connection connection = config.buildDatasource().getConnection();

Review comment:
       Probably you should provide a withoutValidation() option so that users who submit a pipeline form a node that do not have access to the server can disable this.

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -0,0 +1,768 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import static org.apache.beam.sdk.io.TextIO.readFiles;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import java.io.IOException;
+import java.io.Serializable;
+import java.security.PrivateKey;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import javax.sql.DataSource;
+import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.snowflake.credentials.KeyPairSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.OAuthTokenSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.UsernamePasswordSnowflakeCredentials;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Wait;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IO to read and write data on Snowflake.
+ *
+ * <p>SnowflakeIO uses <a href="https://docs.snowflake.net/manuals/user-guide/jdbc.html">Snowflake
+ * JDBC</a> driver under the hood, but data isn't read/written using JDBC directly. Instead,
+ * SnowflakeIO uses dedicated <b>COPY</b> operations to read/write data from/to Google Cloud
+ * Storage.
+ *
+ * <p>To configure SnowflakeIO to read/write from your Snowflake instance, you have to provide a
+ * {@link DataSourceConfiguration} using {@link
+ * DataSourceConfiguration#create(SnowflakeCredentials)}, where {@link SnowflakeCredentials might be
+ * created using {@link org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory}}.
+ * Additionally one of {@link DataSourceConfiguration#withServerName(String)} or {@link
+ * DataSourceConfiguration#withUrl(String)} must be used to tell SnowflakeIO which instance to use.
+ * <br>
+ * There are also other options available to configure connection to Snowflake:
+ *
+ * <ul>
+ *   <li>{@link DataSourceConfiguration#withWarehouse(String)} to specify which Warehouse to use
+ *   <li>{@link DataSourceConfiguration#withDatabase(String)} to specify which Database to connect
+ *       to
+ *   <li>{@link DataSourceConfiguration#withSchema(String)} to specify which schema to use
+ *   <li>{@link DataSourceConfiguration#withRole(String)} to specify which role to use
+ *   <li>{@link DataSourceConfiguration#withLoginTimeout(Integer)} to specify the timeout for the
+ *       login
+ *   <li>{@link DataSourceConfiguration#withPortNumber(Integer)} to specify custom port of Snowflake
+ *       instance
+ * </ul>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
+ *     SnowflakeIO.DataSourceConfiguration.create(SnowflakeCredentialsFactory.of(options))
+ *         .withServerName(options.getServerName())
+ *         .withWarehouse(options.getWarehouse())
+ *         .withDatabase(options.getDatabase())
+ *         .withSchema(options.getSchema());
+ * }</pre>
+ *
+ * <h3>Reading from Snowflake</h3>
+ *
+ * <p>SnowflakeIO.Read returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is
+ * the type returned by the provided {@link CsvMapper}.
+ *
+ * <p>For example
+ *
+ * <pre>{@code
+ * PCollection<GenericRecord> items = pipeline.apply(
+ *  SnowflakeIO.<GenericRecord>read()
+ *    .withDataSourceConfiguration(dataSourceConfiguration)
+ *    .fromQuery(QUERY)
+ *    .withStagingBucketName(stagingBucketName)
+ *    .withIntegrationName(integrationName)
+ *    .withCsvMapper(...)
+ *    .withCoder(...));
+ * }</pre>
+ */
+public class SnowflakeIO {
+  private static final Logger LOG = LoggerFactory.getLogger(SnowflakeIO.class);
+
+  private static final String CSV_QUOTE_CHAR = "'";
+
+  /**
+   * Read data from Snowflake via COPY statement via user-defined {@link SnowflakeService}.
+   *
+   * @param snowflakeService user-defined {@link SnowflakeService}
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read(
+      SnowflakeService snowflakeService, SnowflakeCloudProvider snowflakeCloudProvider) {
+    return new AutoValue_SnowflakeIO_Read.Builder<T>()
+        .setSnowflakeService(snowflakeService)
+        .setSnowflakeCloudProvider(snowflakeCloudProvider)
+        .build();
+  }
+
+  /**
+   * Read data from Snowflake via COPY statement via default {@link SnowflakeServiceImpl}.
+   *
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read() {
+    return read(new SnowflakeServiceImpl(), new GCSProvider());
+  }
+
+  /**
+   * Interface for user-defined function mapping parts of CSV line into T. Used for
+   * SnowflakeIO.Read.
+   *
+   * @param <T> Type of data to be read.
+   */
+  @FunctionalInterface
+  public interface CsvMapper<T> extends Serializable {
+    T mapRow(String[] parts) throws Exception;
+  }
+
+  /** Implementation of {@link #read()}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getIntegrationName();
+
+    @Nullable
+    abstract String getStagingBucketName();
+
+    @Nullable
+    abstract CsvMapper<T> getCsvMapper();
+
+    @Nullable
+    abstract Coder<T> getCoder();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    @Nullable
+    abstract SnowflakeCloudProvider getSnowflakeCloudProvider();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setIntegrationName(String integrationName);
+
+      abstract Builder<T> setStagingBucketName(String stagingBucketName);
+
+      abstract Builder<T> setCsvMapper(CsvMapper<T> csvMapper);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Builder<T> setSnowflakeCloudProvider(SnowflakeCloudProvider snowflakeCloudProvider);
+
+      abstract Read<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Read<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+
+      try {
+        Connection connection = config.buildDatasource().getConnection();
+        connection.close();
+      } catch (SQLException e) {
+        throw new IllegalArgumentException(
+            "Invalid DataSourceConfiguration. Underlying cause: " + e);
+      }
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Read<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Read<T> fromQuery(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A table name to be read in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Read<T> fromTable(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * Name of the bucket to use as tmp location of CSVs during COPY statement.
+     *
+     * @param stagingBucketName - String with the name of the bucket.
+     */
+    public Read<T> withStagingBucketName(String stagingBucketName) {
+      return toBuilder().setStagingBucketName(stagingBucketName).build();
+    }
+
+    /**
+     * Name of the Storage Integration in Snowflake to be used.
+     *
+     * @param integrationName - String with the name of the Storage Integration.
+     */
+    public Read<T> withIntegrationName(String integrationName) {
+      return toBuilder().setIntegrationName(integrationName).build();
+    }
+
+    /**
+     * User-defined function mapping CSV lines into user data.
+     *
+     * @param csvMapper - an instance of {@link CsvMapper}.
+     */
+    public Read<T> withCsvMapper(CsvMapper<T> csvMapper) {
+      return toBuilder().setCsvMapper(csvMapper).build();
+    }
+
+    /**
+     * A coder to use.
+     *
+     * @param coder - an instance of {@link Coder}.
+     */
+    public Read<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      // Either table or query is required. If query is present, it's being used, table is used
+      // otherwise
+      checkArgument(
+          getQuery() != null || getTable() != null, "fromTable() or fromQuery() is required");
+      checkArgument(
+          !(getQuery() != null && getTable() != null),
+          "fromTable() and fromQuery() are not allowed together");
+      checkArgument(getCsvMapper() != null, "withCsvMapper() is required");
+      checkArgument(getCoder() != null, "withCoder() is required");
+      checkArgument(getIntegrationName() != null, "withIntegrationName() is required");
+      checkArgument(getStagingBucketName() != null, "withStagingBucketName() is required");
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      String gcpTmpDirName = makeTmpDirName();
+      PCollection<Void> emptyCollection = input.apply(Create.of((Void) null));
+
+      PCollection<T> output =
+          emptyCollection
+              .apply(
+                  ParDo.of(
+                      new CopyIntoStageFn(
+                          getDataSourceProviderFn(),
+                          getQuery(),
+                          getTable(),
+                          getIntegrationName(),
+                          getStagingBucketName(),
+                          gcpTmpDirName,
+                          getSnowflakeService(),
+                          getSnowflakeCloudProvider())))
+              .apply(FileIO.matchAll())
+              .apply(FileIO.readMatches())
+              .apply(readFiles())
+              .apply(ParDo.of(new MapCsvToStringArrayFn()))
+              .apply(ParDo.of(new MapStringArrayToUserDataFn<>(getCsvMapper())));
+
+      output.setCoder(getCoder());
+
+      emptyCollection
+          .apply(Wait.on(output))
+          .apply(
+              ParDo.of(
+                  new CleanTmpFilesFromGcsFn(
+                      getStagingBucketName(), gcpTmpDirName, getSnowflakeCloudProvider())));
+
+      return output;
+    }
+
+    private String makeTmpDirName() {
+      return String.format(
+          "sf_copy_csv_%s_%s",
+          new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date()),
+          UUID.randomUUID().toString().subSequence(0, 8) // first 8 chars of UUID should be enough
+          );
+    }
+
+    private static class CopyIntoStageFn extends DoFn<Object, String> {
+      private final SerializableFunction<Void, DataSource> dataSourceProviderFn;
+      private final String query;
+      private final String table;
+      private final String integrationName;
+      private final String stagingBucketName;
+      private final String tmpDirName;
+      private final SnowflakeService snowflakeService;
+      private final SnowflakeCloudProvider cloudProvider;
+
+      private CopyIntoStageFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn,
+          String query,
+          String table,
+          String integrationName,
+          String stagingBucketName,
+          String tmpDirName,
+          SnowflakeService snowflakeService,
+          SnowflakeCloudProvider cloudProvider) {
+        this.dataSourceProviderFn = dataSourceProviderFn;
+        this.query = query;
+        this.table = table;
+        this.integrationName = integrationName;
+        this.stagingBucketName = stagingBucketName;
+        this.tmpDirName = tmpDirName;
+        this.snowflakeService = snowflakeService;
+        this.cloudProvider = cloudProvider;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) throws Exception {
+        String stagingBucketDir = this.cloudProvider.formatCloudPath(stagingBucketName, tmpDirName);
+        String output =
+            snowflakeService.copyIntoStage(
+                dataSourceProviderFn,
+                query,
+                table,
+                integrationName,
+                stagingBucketDir,
+                tmpDirName,
+                this.cloudProvider);
+
+        context.output(output);
+      }
+    }
+
+    public static class MapCsvToStringArrayFn extends DoFn<String, String[]> {

Review comment:
       Sounds good.

##########
File path: sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/read/SnowflakeIOReadTest.java
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.test.unit.read;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.io.AvroGeneratedUser;
+import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
+import org.apache.beam.sdk.io.snowflake.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBasicDataSource;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeDatabase;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeServiceImpl;
+import org.apache.beam.sdk.io.snowflake.test.unit.BatchTestPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.model.Statement;
+
+@RunWith(JUnit4.class)
+public class SnowflakeIOReadTest {
+  public static final String FAKE_TABLE = "FAKE_TABLE";
+
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException exceptionRule = ExpectedException.none();
+
+  private static SnowflakeIO.DataSourceConfiguration dataSourceConfiguration;
+  private static BatchTestPipelineOptions options;
+
+  private static SnowflakeService snowflakeService;
+
+  private static String stagingBucketName;
+  private static String integrationName;
+
+  private static List<GenericRecord> avroTestData;
+
+  private transient TemporaryFolder testFolder = new TemporaryFolder();
+
+  @BeforeClass
+  public static void setup() {
+
+    List<String> testData = Arrays.asList("Paul,51,red", "Jackson,41,green");
+
+    avroTestData =
+        ImmutableList.of(
+            new AvroGeneratedUser("Paul", 51, "red"),
+            new AvroGeneratedUser("Jackson", 41, "green"));
+
+    FakeSnowflakeDatabase.createTableWithElements(FAKE_TABLE, testData);
+    PipelineOptionsFactory.register(BatchTestPipelineOptions.class);
+    options = TestPipeline.testingPipelineOptions().as(BatchTestPipelineOptions.class);
+    options.setServerName("NULL.snowflakecomputing.com");
+    options.setStorageIntegration("STORAGE_INTEGRATION");
+    options.setStagingBucketName("BUCKET");
+
+    stagingBucketName = options.getStagingBucketName();
+    integrationName = options.getStorageIntegration();
+
+    dataSourceConfiguration =
+        SnowflakeIO.DataSourceConfiguration.create(new FakeSnowflakeBasicDataSource())
+            .withServerName(options.getServerName());
+
+    snowflakeService = new FakeSnowflakeServiceImpl();
+  }
+
+  @Rule
+  public final transient TestRule folderThenPipeline =
+      new TestRule() {
+        @Override
+        public Statement apply(final Statement base, final Description description) {
+          Statement withPipeline =
+              new Statement() {
+                @Override
+                public void evaluate() {
+                  pipeline = TestPipeline.fromOptions(options);
+                }
+              };
+          return testFolder.apply(withPipeline, description);
+        }
+      };
+
+  @Test
+  public void testConfigIsMissingStagingBucketName() {
+    exceptionRule.expect(IllegalArgumentException.class);
+    exceptionRule.expectMessage("withStagingBucketName() is required");
+
+    pipeline.apply(
+        SnowflakeIO.<GenericRecord>read(snowflakeService)
+            .withDataSourceConfiguration(dataSourceConfiguration)
+            .fromTable(FAKE_TABLE)
+            .withIntegrationName(integrationName)
+            .withCsvMapper(getCsvMapper())
+            .withCoder(AvroCoder.of(AvroGeneratedUser.getClassSchema())));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testConfigIsMissingIntegrationName() {
+    exceptionRule.expect(IllegalArgumentException.class);
+    exceptionRule.expectMessage("withIntegrationName() is required");
+
+    pipeline.apply(
+        SnowflakeIO.<GenericRecord>read(snowflakeService)
+            .withDataSourceConfiguration(dataSourceConfiguration)
+            .fromTable(FAKE_TABLE)
+            .withStagingBucketName(stagingBucketName)
+            .withCsvMapper(getCsvMapper())
+            .withCoder(AvroCoder.of(AvroGeneratedUser.getClassSchema())));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testConfigIsMissingCsvMapper() {
+    exceptionRule.expect(IllegalArgumentException.class);
+    exceptionRule.expectMessage("withCsvMapper() is required");
+
+    pipeline.apply(
+        SnowflakeIO.<GenericRecord>read(snowflakeService)
+            .withDataSourceConfiguration(dataSourceConfiguration)
+            .fromTable(FAKE_TABLE)
+            .withStagingBucketName(stagingBucketName)
+            .withIntegrationName(integrationName)
+            .withCoder(AvroCoder.of(AvroGeneratedUser.getClassSchema())));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testConfigIsMissingCoder() {
+    exceptionRule.expect(IllegalArgumentException.class);
+    exceptionRule.expectMessage("withCoder() is required");
+
+    pipeline.apply(
+        SnowflakeIO.<GenericRecord>read(snowflakeService)
+            .withDataSourceConfiguration(dataSourceConfiguration)
+            .fromTable(FAKE_TABLE)
+            .withStagingBucketName(stagingBucketName)
+            .withIntegrationName(integrationName)
+            .withCsvMapper(getCsvMapper()));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testConfigIsMissingFromTableOrFromQuery() {
+    exceptionRule.expect(IllegalArgumentException.class);
+    exceptionRule.expectMessage("fromTable() or fromQuery() is required");
+
+    pipeline.apply(
+        SnowflakeIO.<GenericRecord>read(snowflakeService)
+            .withDataSourceConfiguration(dataSourceConfiguration)
+            .withStagingBucketName(stagingBucketName)
+            .withIntegrationName(integrationName)
+            .withCsvMapper(getCsvMapper())
+            .withCoder(AvroCoder.of(AvroGeneratedUser.getClassSchema())));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testConfigIsMissingDataSourceConfiguration() {
+    exceptionRule.expect(IllegalArgumentException.class);
+    exceptionRule.expectMessage(
+        "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+    pipeline.apply(
+        SnowflakeIO.<GenericRecord>read(snowflakeService)
+            .fromTable(FAKE_TABLE)
+            .withStagingBucketName(stagingBucketName)
+            .withIntegrationName(integrationName)
+            .withCsvMapper(getCsvMapper())
+            .withCoder(AvroCoder.of(AvroGeneratedUser.getClassSchema())));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testConfigContainsFromQueryAndFromTable() {
+    exceptionRule.expect(IllegalArgumentException.class);
+    exceptionRule.expectMessage("fromTable() and fromQuery() are not allowed together");
+
+    pipeline.apply(
+        SnowflakeIO.<GenericRecord>read(snowflakeService)
+            .withDataSourceConfiguration(dataSourceConfiguration)
+            .fromQuery("")
+            .fromTable(FAKE_TABLE)
+            .withStagingBucketName(stagingBucketName)
+            .withIntegrationName(integrationName)
+            .withCsvMapper(getCsvMapper())
+            .withCoder(AvroCoder.of(AvroGeneratedUser.getClassSchema())));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testTableDoesntExist() {
+    exceptionRule.expect(PipelineExecutionException.class);
+    exceptionRule.expectMessage("SQL compilation error: Table does not exist");
+
+    pipeline.apply(
+        SnowflakeIO.<GenericRecord>read(snowflakeService)
+            .withDataSourceConfiguration(dataSourceConfiguration)
+            .fromTable("NON_EXIST")
+            .withStagingBucketName(stagingBucketName)
+            .withIntegrationName(integrationName)
+            .withCsvMapper(getCsvMapper())
+            .withCoder(AvroCoder.of(AvroGeneratedUser.getClassSchema())));
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testReadWithConfigIsProper() {
+    PCollection<GenericRecord> items =

Review comment:
       Please make sure that all read paths are covered by tests.

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import static org.apache.beam.sdk.io.TextIO.readFiles;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.opencsv.CSVParser;
+import com.opencsv.CSVParserBuilder;
+import java.io.IOException;
+import java.io.Serializable;
+import java.security.PrivateKey;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import javax.sql.DataSource;
+import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.snowflake.credentials.KeyPairSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.OAuthTokenSnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.UsernamePasswordSnowflakeCredentials;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Wait;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IO to read and write data on Snowflake.
+ *
+ * <p>SnowflakeIO uses <a href="https://docs.snowflake.net/manuals/user-guide/jdbc.html">Snowflake
+ * JDBC</a> driver under the hood, but data isn't read/written using JDBC directly. Instead,
+ * SnowflakeIO uses dedicated <b>COPY</b> operations to read/write data from/to a cloud bucket. By
+ * now only Google Cloud Storage is supported.
+ *
+ * <p>To configure SnowflakeIO to read/write from your Snowflake instance, you have to provide a
+ * {@link DataSourceConfiguration} using {@link
+ * DataSourceConfiguration#create(SnowflakeCredentials)}, where {@link SnowflakeCredentials might be
+ * created using {@link org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory}}.
+ * Additionally one of {@link DataSourceConfiguration#withServerName(String)} or {@link
+ * DataSourceConfiguration#withUrl(String)} must be used to tell SnowflakeIO which instance to use.
+ * <br>
+ * There are also other options available to configure connection to Snowflake:
+ *
+ * <ul>
+ *   <li>{@link DataSourceConfiguration#withWarehouse(String)} to specify which Warehouse to use
+ *   <li>{@link DataSourceConfiguration#withDatabase(String)} to specify which Database to connect
+ *       to
+ *   <li>{@link DataSourceConfiguration#withSchema(String)} to specify which schema to use
+ *   <li>{@link DataSourceConfiguration#withRole(String)} to specify which role to use
+ *   <li>{@link DataSourceConfiguration#withLoginTimeout(Integer)} to specify the timeout for the
+ *       login
+ *   <li>{@link DataSourceConfiguration#withPortNumber(Integer)} to specify custom port of Snowflake
+ *       instance
+ * </ul>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
+ *     SnowflakeIO.DataSourceConfiguration.create(SnowflakeCredentialsFactory.of(options))
+ *         .withServerName(options.getServerName())
+ *         .withWarehouse(options.getWarehouse())
+ *         .withDatabase(options.getDatabase())
+ *         .withSchema(options.getSchema());
+ * }</pre>
+ *
+ * <h3>Reading from Snowflake</h3>
+ *
+ * <p>SnowflakeIO.Read returns a bounded collection of {@code T} as a {@code PCollection<T>}. T is
+ * the type returned by the provided {@link CsvMapper}.
+ *
+ * <p>For example
+ *
+ * <pre>{@code
+ * PCollection<GenericRecord> items = pipeline.apply(
+ *  SnowflakeIO.<GenericRecord>read()
+ *    .withDataSourceConfiguration(dataSourceConfiguration)
+ *    .fromQuery(QUERY)
+ *    .withStagingBucketName(stagingBucketName)
+ *    .withIntegrationName(integrationName)
+ *    .withCsvMapper(...)
+ *    .withCoder(...));
+ * }</pre>
+ *
+ * <p><b>Important</b> When reading data from Snowflake, temporary CSV files are created on the
+ * specified stagingBucketName in directory named `sf_copy_csv_[RANDOM CHARS]_[TIMESTAMP]`. This
+ * directory and all the files are deleted automatically by default, but in case of failed pipeline
+ * they will remain and have to be removed manually.
+ */
+public class SnowflakeIO {
+  private static final Logger LOG = LoggerFactory.getLogger(SnowflakeIO.class);
+
+  private static final String CSV_QUOTE_CHAR = "'";
+  /**
+   * Read data from Snowflake.
+   *
+   * @param snowflakeService user-defined {@link SnowflakeService}
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read(SnowflakeService snowflakeService) {
+    return new AutoValue_SnowflakeIO_Read.Builder<T>()
+        .setSnowflakeService(snowflakeService)
+        .build();
+  }
+
+  /**
+   * Read data from Snowflake.
+   *
+   * @param <T> Type of the data to be read.
+   */
+  public static <T> Read<T> read() {
+    return read(new SnowflakeServiceImpl());
+  }
+
+  /**
+   * Interface for user-defined function mapping parts of CSV line into T. Used for
+   * SnowflakeIO.Read.
+   *
+   * @param <T> Type of data to be read.
+   */
+  @FunctionalInterface
+  public interface CsvMapper<T> extends Serializable {
+    T mapRow(String[] parts) throws Exception;
+  }
+
+  /** Implementation of {@link #read()}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getIntegrationName();
+
+    @Nullable
+    abstract String getStagingBucketName();
+
+    @Nullable
+    abstract CsvMapper<T> getCsvMapper();
+
+    @Nullable
+    abstract Coder<T> getCoder();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setIntegrationName(String integrationName);
+
+      abstract Builder<T> setStagingBucketName(String stagingBucketName);
+
+      abstract Builder<T> setCsvMapper(CsvMapper<T> csvMapper);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Read<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Read<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+
+      try {
+        Connection connection = config.buildDatasource().getConnection();
+        connection.close();
+      } catch (SQLException e) {
+        throw new IllegalArgumentException(
+            "Invalid DataSourceConfiguration. Underlying cause: " + e);
+      }
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Read<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Read<T> fromQuery(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A table name to be read in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Read<T> fromTable(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * Name of the cloud bucket (GCS by now) to use as tmp location of CSVs during COPY statement.
+     *
+     * @param stagingBucketName - String with the name of the bucket.
+     */
+    public Read<T> withStagingBucketName(String stagingBucketName) {
+      return toBuilder().setStagingBucketName(stagingBucketName).build();
+    }
+
+    /**
+     * Name of the Storage Integration in Snowflake to be used. See
+     * https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html for
+     * reference.
+     *
+     * @param integrationName - String with the name of the Storage Integration.
+     */
+    public Read<T> withIntegrationName(String integrationName) {
+      return toBuilder().setIntegrationName(integrationName).build();
+    }
+
+    /**
+     * User-defined function mapping CSV lines into user data.
+     *
+     * @param csvMapper - an instance of {@link CsvMapper}.
+     */
+    public Read<T> withCsvMapper(CsvMapper<T> csvMapper) {
+      return toBuilder().setCsvMapper(csvMapper).build();
+    }
+
+    /**
+     * A Coder to be used by the output PCollection generated by the source.
+     *
+     * @param coder - an instance of {@link Coder}.
+     */
+    public Read<T> withCoder(Coder<T> coder) {
+      return toBuilder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      // Either table or query is required. If query is present, it's being used, table is used
+      // otherwise
+      checkArgument(
+          getQuery() != null || getTable() != null, "fromTable() or fromQuery() is required");
+      checkArgument(
+          !(getQuery() != null && getTable() != null),
+          "fromTable() and fromQuery() are not allowed together");
+      checkArgument(getCsvMapper() != null, "withCsvMapper() is required");
+      checkArgument(getCoder() != null, "withCoder() is required");
+      checkArgument(getIntegrationName() != null, "withIntegrationName() is required");
+      checkArgument(getStagingBucketName() != null, "withStagingBucketName() is required");
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      String gcpTmpDirName = makeTmpDirName();
+      String stagingBucketDir = String.format("%s/%s/", getStagingBucketName(), gcpTmpDirName);
+
+      PCollection<Void> emptyCollection = input.apply(Create.of((Void) null));
+
+      PCollection<T> output =
+          emptyCollection
+              .apply(
+                  ParDo.of(
+                      new CopyIntoStageFn(
+                          getDataSourceProviderFn(),
+                          getQuery(),
+                          getTable(),
+                          getIntegrationName(),
+                          stagingBucketDir,
+                          getSnowflakeService())))
+              .apply(FileIO.matchAll())
+              .apply(FileIO.readMatches())
+              .apply(readFiles())
+              .apply(ParDo.of(new MapCsvToStringArrayFn()))
+              .apply(ParDo.of(new MapStringArrayToUserDataFn<>(getCsvMapper())));
+
+      output.setCoder(getCoder());
+
+      emptyCollection
+          .apply(Wait.on(output))
+          .apply(ParDo.of(new CleanTmpFilesFromGcsFn(stagingBucketDir)));
+
+      return output;
+    }
+
+    private String makeTmpDirName() {
+      return String.format(
+          "sf_copy_csv_%s_%s",
+          new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date()),
+          UUID.randomUUID().toString().subSequence(0, 8) // first 8 chars of UUID should be enough
+          );
+    }
+
+    private static class CopyIntoStageFn extends DoFn<Object, String> {
+      private final SerializableFunction<Void, DataSource> dataSourceProviderFn;
+      private final String query;
+      private final String table;
+      private final String integrationName;
+      private final String stagingBucketDir;
+      private final SnowflakeService snowflakeService;
+
+      private CopyIntoStageFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn,
+          String query,
+          String table,
+          String integrationName,
+          String stagingBucketDir,
+          SnowflakeService snowflakeService) {
+        this.dataSourceProviderFn = dataSourceProviderFn;
+        this.query = query;
+        this.table = table;
+        this.integrationName = integrationName;
+        this.stagingBucketDir = stagingBucketDir;
+        this.snowflakeService = snowflakeService;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) throws Exception {
+        String output =
+            snowflakeService.copyIntoStage(
+                dataSourceProviderFn, query, table, integrationName, stagingBucketDir);
+
+        context.output(output);
+      }
+    }
+
+    public static class MapCsvToStringArrayFn extends DoFn<String, String[]> {
+      @ProcessElement
+      public void processElement(ProcessContext c) throws IOException {
+        String csvLine = c.element();
+        CSVParser parser = new CSVParserBuilder().withQuoteChar(CSV_QUOTE_CHAR.charAt(0)).build();
+        String[] parts = parser.parseLine(csvLine);
+        c.output(parts);
+      }
+    }
+
+    private static class MapStringArrayToUserDataFn<T> extends DoFn<String[], T> {
+      private final CsvMapper<T> csvMapper;
+
+      public MapStringArrayToUserDataFn(CsvMapper<T> csvMapper) {
+        this.csvMapper = csvMapper;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) throws Exception {
+        context.output(csvMapper.mapRow(context.element()));
+      }
+    }
+
+    public static class CleanTmpFilesFromGcsFn extends DoFn<Object, Object> {
+      private final String stagingBucketDir;
+
+      public CleanTmpFilesFromGcsFn(String stagingBucketDir) {
+        this.stagingBucketDir = stagingBucketDir;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws IOException {
+        String combinedPath = stagingBucketDir + "*";
+        List<ResourceId> paths =
+            FileSystems.match(combinedPath).metadata().stream()
+                .map(metadata -> metadata.resourceId())
+                .collect(Collectors.toList());
+
+        FileSystems.delete(paths);

Review comment:
       Probably specify MoveOptions.IGNORE_MISSING_FILES to not fail incase this bundle is retries after a failure (either the source or the destination should exist).
   https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MoveOptions.java#L30




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org