You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/22 08:05:05 UTC

[GitHub] [beam] purbanow opened a new pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

purbanow opened a new pull request #11794:
URL: https://github.com/apache/beam/pull/11794


   This PR is part of adding SnowflakeIO to Java SDK [BEAM-9893](https://issues.apache.org/jira/browse/BEAM-9893). Precisely this PR is adding write operation to SnowflakeIO [BEAM-9894](https://issues.apache.org/jira/browse/BEAM-9894).
   
   The SnowflakeIO.Write works in the way that puts data on GCS as CSV files and then uses Snowflake's JDBC driver to run [COPY INTO TABLE](https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html) statement to move CSV files from GCS to Snowflake table.
   
   As mentioned in the previous [PR](https://github.com/apache/beam/pull/11360),  next PR’s will contain integration tests, streaming and cross-language support.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r429105275



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeServiceImpl.java
##########
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.snowflake;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.function.Consumer;
-import javax.sql.DataSource;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-
-/**
- * Implemenation of {@link org.apache.beam.sdk.io.snowflake.SnowflakeService} used in production.
- */
-public class SnowflakeServiceImpl implements SnowflakeService {

Review comment:
       Note: This file moved to `services/SnowflakeServiceImpl.java`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r433620166



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");
+      checkArgument(
+          loc.getStagingBucketName() != null, "location with stagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection write(PCollection input, String stagingBucketDir) {
+      SnowflakeService snowflakeService =
+          getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeServiceImpl();
+
+      PCollection files = writeFiles(input, stagingBucketDir);
+
+      files =
+          (PCollection)
+              files.apply("Create list of files to copy", Combine.globally(new Concatenate()));
+
+      return (PCollection)
+          files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+    }
+
+    private PCollection writeFiles(PCollection<T> input, String stagingBucketDir) {
+      class Parse extends DoFn<KV<T, String>, String> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          c.output(c.element().getValue());
+        }
+      }
+
+      PCollection mappedUserData =

Review comment:
       I checked whole code and I added missing types :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432423695



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -257,23 +301,12 @@
     }
 
     /**
-     * Name of the cloud bucket (GCS by now) to use as tmp location of CSVs during COPY statement.
-     *
-     * @param stagingBucketName - String with the name of the bucket.
-     */
-    public Read<T> withStagingBucketName(String stagingBucketName) {
-      return toBuilder().setStagingBucketName(stagingBucketName).build();
-    }
-
-    /**
-     * Name of the Storage Integration in Snowflake to be used. See
-     * https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html for
-     * reference.
+     * A location object which contains connection config between Snowflake and GCP.
      *
-     * @param integrationName - String with the name of the Storage Integration.
+     * @param location - an instance of {@link Location}.
      */
-    public Read<T> withIntegrationName(String integrationName) {
-      return toBuilder().setIntegrationName(integrationName).build();
+    public Read<T> via(Location location) {

Review comment:
       No reason. I changed to `withLocation ` to be more consistent. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r433627687



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");
+      checkArgument(
+          loc.getStagingBucketName() != null, "location with stagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection write(PCollection input, String stagingBucketDir) {
+      SnowflakeService snowflakeService =
+          getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeServiceImpl();
+
+      PCollection files = writeFiles(input, stagingBucketDir);
+
+      files =
+          (PCollection)
+              files.apply("Create list of files to copy", Combine.globally(new Concatenate()));
+
+      return (PCollection)
+          files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+    }
+
+    private PCollection writeFiles(PCollection<T> input, String stagingBucketDir) {
+      class Parse extends DoFn<KV<T, String>, String> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          c.output(c.element().getValue());
+        }
+      }
+
+      PCollection mappedUserData =
+          input
+              .apply(
+                  "Map user data to Objects array",
+                  ParDo.of(new MapUserDataObjectsArrayFn<T>(getUserDataMapper())))
+              .apply("Map Objects array to CSV lines", ParDo.of(new MapObjectsArrayToCsvFn()))
+              .setCoder(StringUtf8Coder.of());
+
+      WriteFilesResult filesResult =
+          (WriteFilesResult)
+              mappedUserData.apply(
+                  "Write files to specified location",
+                  FileIO.write()
+                      .via((FileIO.Sink) new CSVSink())
+                      .to(stagingBucketDir)
+                      .withPrefix(getFileNameTemplate())
+                      .withSuffix(".csv")
+                      .withPrefix(UUID.randomUUID().toString().subSequence(0, 8).toString())
+                      .withCompression(Compression.GZIP));
+
+      return (PCollection)
+          filesResult
+              .getPerDestinationOutputFilenames()
+              .apply("Parse KV filenames to Strings", ParDo.of(new Parse()));
+    }
+
+    private ParDo.SingleOutput<Object, Object> copyToTable(
+        SnowflakeService snowflakeService, String stagingBucketDir) {
+      return ParDo.of(
+          new CopyToTableFn<>(
+              getDataSourceProviderFn(),
+              getTable(),
+              getQuery(),
+              stagingBucketDir,
+              getLocation(),
+              getWriteDisposition(),
+              snowflakeService));
+    }
+  }
+
+  public static class Concatenate extends Combine.CombineFn<String, List<String>, List<String>> {
+    @Override
+    public List<String> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<String> addInput(List<String> mutableAccumulator, String input) {
+      mutableAccumulator.add(String.format("'%s'", input));
+      return mutableAccumulator;
+    }
+
+    @Override
+    public List<String> mergeAccumulators(Iterable<List<String>> accumulators) {
+      List<String> result = createAccumulator();
+      for (List<String> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<String> extractOutput(List<String> accumulator) {
+      return accumulator;
+    }
+  }
+
+  private static class MapUserDataObjectsArrayFn<T> extends DoFn<T, Object[]> {
+    private final UserDataMapper<T> csvMapper;
+
+    public MapUserDataObjectsArrayFn(UserDataMapper<T> csvMapper) {
+      this.csvMapper = csvMapper;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) throws Exception {
+      context.output(csvMapper.mapRow(context.element()));
+    }
+  }
+
+  /**
+   * Custom DoFn that maps {@link Object[]} into CSV line to be saved to Snowflake.
+   *
+   * <p>Adds Snowflake-specific quotations around strings.
+   */
+  private static class MapObjectsArrayToCsvFn extends DoFn<Object[], String> {
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      List<Object> csvItems = new ArrayList<>();
+      for (Object o : context.element()) {
+        if (o instanceof String) {
+          String field = (String) o;
+          field = field.replace("'", "''");
+          field = quoteField(field);
+
+          csvItems.add(field);
+        } else {
+          csvItems.add(o);

Review comment:
       Based on  `If the data contains single or double quotes, then those quotes must be escaped.` proper is  `'I can\'t believe it'`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] RyanSkraba commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
RyanSkraba commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r443687101



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +513,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getStorageIntegrationName();
+
+    @Nullable
+    abstract String getStagingBucketName();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setStorageIntegrationName(String storageIntegrationName);
+
+      abstract Builder<T> setStagingBucketName(String stagingBucketName);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> withTable(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * Name of the cloud bucket (GCS by now) to use as tmp location of CSVs during COPY statement.
+     *
+     * @param stagingBucketName - String with the name of the bucket.
+     */
+    public Write<T> withStagingBucketName(String stagingBucketName) {
+      return toBuilder().setStagingBucketName(stagingBucketName).build();
+    }
+
+    /**
+     * Name of the Storage Integration in Snowflake to be used. See
+     * https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html for
+     * reference.
+     *
+     * @param integrationName - String with the name of the Storage Integration.
+     */
+    public Write<T> withStorageIntegrationName(String integrationName) {
+      return toBuilder().setStorageIntegrationName(integrationName).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      checkArguments();
+
+      String stagingBucketDir = String.format("%s/%s/", getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection<String> out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments() {
+      checkArgument(getStagingBucketName() != null, "withStagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection<String> write(PCollection<T> input, String stagingBucketDir) {
+      SnowflakeService snowflakeService =
+          getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeServiceImpl();
+
+      PCollection<String> files = writeFiles(input, stagingBucketDir);
+
+      // Combining PCollection of files as a side input into one list of files
+      ListCoder<String> coder = ListCoder.of(StringUtf8Coder.of());
+      files =
+          (PCollection)
+              files
+                  .getPipeline()
+                  .apply(
+                      Reify.viewInGlobalWindow(
+                          (PCollectionView) files.apply(View.asList()), coder));
+
+      return (PCollection)
+          files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+    }
+
+    private PCollection<String> writeFiles(PCollection<T> input, String stagingBucketDir) {
+
+      PCollection<String> mappedUserData =
+          input
+              .apply(
+                  MapElements.via(
+                      new SimpleFunction<T, Object[]>() {
+                        @Override
+                        public Object[] apply(T element) {
+                          return getUserDataMapper().mapRow(element);
+                        }
+                      }))
+              .apply("Map Objects array to CSV lines", ParDo.of(new MapObjectsArrayToCsvFn()))
+              .setCoder(StringUtf8Coder.of());
+
+      WriteFilesResult filesResult =
+          mappedUserData.apply(
+              "Write files to specified location",
+              FileIO.<String>write()
+                  .via(TextIO.sink())
+                  .to(stagingBucketDir)
+                  .withPrefix(getFileNameTemplate())
+                  .withSuffix(".csv")
+                  .withCompression(Compression.GZIP));
+
+      return (PCollection)
+          filesResult
+              .getPerDestinationOutputFilenames()
+              .apply("Parse KV filenames to Strings", Values.<String>create());
+    }
+
+    private ParDo.SingleOutput<Object, Object> copyToTable(
+        SnowflakeService snowflakeService, String stagingBucketDir) {
+      return ParDo.of(
+          new CopyToTableFn<>(
+              getDataSourceProviderFn(),
+              getTable(),
+              getQuery(),
+              stagingBucketDir,
+              getStorageIntegrationName(),
+              getWriteDisposition(),
+              snowflakeService));
+    }
+  }
+
+  public static class Concatenate extends Combine.CombineFn<String, List<String>, List<String>> {
+    @Override
+    public List<String> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<String> addInput(List<String> mutableAccumulator, String input) {
+      mutableAccumulator.add(String.format("'%s'", input));
+      return mutableAccumulator;
+    }
+
+    @Override
+    public List<String> mergeAccumulators(Iterable<List<String>> accumulators) {
+      List<String> result = createAccumulator();
+      for (List<String> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<String> extractOutput(List<String> accumulator) {
+      return accumulator;
+    }
+  }
+
+  /**
+   * Custom DoFn that maps {@link Object[]} into CSV line to be saved to Snowflake.
+   *
+   * <p>Adds Snowflake-specific quotations around strings.
+   */
+  private static class MapObjectsArrayToCsvFn extends DoFn<Object[], String> {
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      List<Object> csvItems = new ArrayList<>();
+      for (Object o : context.element()) {
+        if (o instanceof String) {
+          String field = (String) o;
+          field = field.replace("'", "''");
+          field = quoteField(field);
+
+          csvItems.add(field);
+        } else {
+          csvItems.add(o);
+        }
+      }
+      context.output(Joiner.on(",").useForNull("").join(csvItems));

Review comment:
       This is where there's an implicit `.toString()` on the objects in the array -- this is still pretty dangerous for many non-primitive classes!
   
   I can think of a couple of solutions: (1) Use something other than CSV as the file for copying into Snowflake, (2) Add a warning to the data mapper doc that the toString's have to be coherent!
   
   For example, if my data mapper function returns `new Object[] {Arrays.asList(1,',',"\n")}` to insert, it's almost certainly going to break the function.
   
   I can see this happening, for example, if a user thinks that returning a JsonObject will insert the JSON as a string into that column.

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");
+      checkArgument(
+          loc.getStagingBucketName() != null, "location with stagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection write(PCollection input, String stagingBucketDir) {
+      SnowflakeService snowflakeService =
+          getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeServiceImpl();
+
+      PCollection files = writeFiles(input, stagingBucketDir);
+
+      files =
+          (PCollection)
+              files.apply("Create list of files to copy", Combine.globally(new Concatenate()));
+
+      return (PCollection)
+          files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+    }
+
+    private PCollection writeFiles(PCollection<T> input, String stagingBucketDir) {
+      class Parse extends DoFn<KV<T, String>, String> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          c.output(c.element().getValue());
+        }
+      }
+
+      PCollection mappedUserData =
+          input
+              .apply(
+                  "Map user data to Objects array",
+                  ParDo.of(new MapUserDataObjectsArrayFn<T>(getUserDataMapper())))
+              .apply("Map Objects array to CSV lines", ParDo.of(new MapObjectsArrayToCsvFn()))
+              .setCoder(StringUtf8Coder.of());
+
+      WriteFilesResult filesResult =
+          (WriteFilesResult)
+              mappedUserData.apply(
+                  "Write files to specified location",
+                  FileIO.write()
+                      .via((FileIO.Sink) new CSVSink())
+                      .to(stagingBucketDir)
+                      .withPrefix(getFileNameTemplate())
+                      .withSuffix(".csv")
+                      .withPrefix(UUID.randomUUID().toString().subSequence(0, 8).toString())
+                      .withCompression(Compression.GZIP));
+
+      return (PCollection)
+          filesResult
+              .getPerDestinationOutputFilenames()
+              .apply("Parse KV filenames to Strings", ParDo.of(new Parse()));
+    }
+
+    private ParDo.SingleOutput<Object, Object> copyToTable(
+        SnowflakeService snowflakeService, String stagingBucketDir) {
+      return ParDo.of(
+          new CopyToTableFn<>(
+              getDataSourceProviderFn(),
+              getTable(),
+              getQuery(),
+              stagingBucketDir,
+              getLocation(),
+              getWriteDisposition(),
+              snowflakeService));
+    }
+  }
+
+  public static class Concatenate extends Combine.CombineFn<String, List<String>, List<String>> {
+    @Override
+    public List<String> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<String> addInput(List<String> mutableAccumulator, String input) {
+      mutableAccumulator.add(String.format("'%s'", input));
+      return mutableAccumulator;
+    }
+
+    @Override
+    public List<String> mergeAccumulators(Iterable<List<String>> accumulators) {
+      List<String> result = createAccumulator();
+      for (List<String> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<String> extractOutput(List<String> accumulator) {
+      return accumulator;
+    }
+  }
+
+  private static class MapUserDataObjectsArrayFn<T> extends DoFn<T, Object[]> {
+    private final UserDataMapper<T> csvMapper;
+
+    public MapUserDataObjectsArrayFn(UserDataMapper<T> csvMapper) {
+      this.csvMapper = csvMapper;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) throws Exception {
+      context.output(csvMapper.mapRow(context.element()));
+    }
+  }
+
+  /**
+   * Custom DoFn that maps {@link Object[]} into CSV line to be saved to Snowflake.
+   *
+   * <p>Adds Snowflake-specific quotations around strings.
+   */
+  private static class MapObjectsArrayToCsvFn extends DoFn<Object[], String> {
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      List<Object> csvItems = new ArrayList<>();
+      for (Object o : context.element()) {
+        if (o instanceof String) {
+          String field = (String) o;
+          field = field.replace("'", "''");
+          field = quoteField(field);
+
+          csvItems.add(field);
+        } else {
+          csvItems.add(o);

Review comment:
       The line above `field = field.replace("'", "''");` ?  It looks like the `''` style.  If you confirm it works with Snowflake, I'll trust you!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11794:
URL: https://github.com/apache/beam/pull/11794#issuecomment-637058818


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on pull request #11794:
URL: https://github.com/apache/beam/pull/11794#issuecomment-642150480


   Hey @RyanSkraba , will you find a moment to check again ? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r429105275



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeServiceImpl.java
##########
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.snowflake;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.function.Consumer;
-import javax.sql.DataSource;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-
-/**
- * Implemenation of {@link org.apache.beam.sdk.io.snowflake.SnowflakeService} used in production.
- */
-public class SnowflakeServiceImpl implements SnowflakeService {

Review comment:
       Note: This file was moved to `services/SnowflakeServiceImpl.java`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r433652793



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/CSVSink.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.util.List;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+
+/** Implementation of {@link org.apache.beam.sdk.io.FileIO.Sink} for writing CSV. */
+public class CSVSink implements FileIO.Sink<String> {

Review comment:
       Thanks for spotting this. I removed `CSVSink` class and start using `.via(TextIO.sink())` .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432458477



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");

Review comment:
       Yes, it can be `nullable` for Write method. Thanks for spotting this :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432427386



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/Location.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.Serializable;
+
+/**
+ * Used as one of the arguments for {@link org.apache.beam.sdk.io.snowflake.SnowflakeIO} write and
+ * read operations. It keeps information about storage integration and staging bucket name.
+ * Integration name is Snowflake storage integration object created according to Snowflake
+ * documentation for the GCS bucket. Staging bucket name is Google Cloud Storage bucket which in the
+ * case of writing operation will be used to save CSV files which will end up in Snowflake under
+ * “staging_bucket_name/data” path and in the case of reading operation will be used as a temporary
+ * location for storing CSV files named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` which will be removed
+ * automatically once Read operation finishes.
+ */
+public class Location implements Serializable {
+  private String storageIntegrationName;

Review comment:
       Both `storageIntegrationName ` and `stagingBucketName` are always  required for the connection between Snowflake and GCP. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432427386



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/Location.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.Serializable;
+
+/**
+ * Used as one of the arguments for {@link org.apache.beam.sdk.io.snowflake.SnowflakeIO} write and
+ * read operations. It keeps information about storage integration and staging bucket name.
+ * Integration name is Snowflake storage integration object created according to Snowflake
+ * documentation for the GCS bucket. Staging bucket name is Google Cloud Storage bucket which in the
+ * case of writing operation will be used to save CSV files which will end up in Snowflake under
+ * “staging_bucket_name/data” path and in the case of reading operation will be used as a temporary
+ * location for storing CSV files named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` which will be removed
+ * automatically once Read operation finishes.
+ */
+public class Location implements Serializable {
+  private String storageIntegrationName;

Review comment:
       Both `storageIntegrationName ` and `stagingBucketName` are always  required for proper connection between Snowflake and GCP. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432427386



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/Location.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.Serializable;
+
+/**
+ * Used as one of the arguments for {@link org.apache.beam.sdk.io.snowflake.SnowflakeIO} write and
+ * read operations. It keeps information about storage integration and staging bucket name.
+ * Integration name is Snowflake storage integration object created according to Snowflake
+ * documentation for the GCS bucket. Staging bucket name is Google Cloud Storage bucket which in the
+ * case of writing operation will be used to save CSV files which will end up in Snowflake under
+ * “staging_bucket_name/data” path and in the case of reading operation will be used as a temporary
+ * location for storing CSV files named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` which will be removed
+ * automatically once Read operation finishes.
+ */
+public class Location implements Serializable {
+  private String storageIntegrationName;

Review comment:
       Both `storageIntegrationName ` and `stagingBucketName` are always  required for the connection between Snowflake and GCP. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432432306



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");
+      checkArgument(
+          loc.getStagingBucketName() != null, "location with stagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");

Review comment:
       Yes, you're right. I changed `to()` method to `withTable()` to be more consistent. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r433610313



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");
+      checkArgument(
+          loc.getStagingBucketName() != null, "location with stagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection write(PCollection input, String stagingBucketDir) {
+      SnowflakeService snowflakeService =
+          getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeServiceImpl();
+
+      PCollection files = writeFiles(input, stagingBucketDir);
+
+      files =
+          (PCollection)
+              files.apply("Create list of files to copy", Combine.globally(new Concatenate()));
+
+      return (PCollection)
+          files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+    }
+
+    private PCollection writeFiles(PCollection<T> input, String stagingBucketDir) {
+      class Parse extends DoFn<KV<T, String>, String> {

Review comment:
       Thanks for this tip :) I removed `Parse` class 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kkucharc merged pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
kkucharc merged pull request #11794:
URL: https://github.com/apache/beam/pull/11794


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r433114311



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");
+      checkArgument(
+          loc.getStagingBucketName() != null, "location with stagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection write(PCollection input, String stagingBucketDir) {
+      SnowflakeService snowflakeService =
+          getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeServiceImpl();
+
+      PCollection files = writeFiles(input, stagingBucketDir);
+
+      files =
+          (PCollection)
+              files.apply("Create list of files to copy", Combine.globally(new Concatenate()));
+
+      return (PCollection)
+          files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+    }
+
+    private PCollection writeFiles(PCollection<T> input, String stagingBucketDir) {
+      class Parse extends DoFn<KV<T, String>, String> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          c.output(c.element().getValue());
+        }
+      }
+
+      PCollection mappedUserData =
+          input
+              .apply(
+                  "Map user data to Objects array",
+                  ParDo.of(new MapUserDataObjectsArrayFn<T>(getUserDataMapper())))
+              .apply("Map Objects array to CSV lines", ParDo.of(new MapObjectsArrayToCsvFn()))
+              .setCoder(StringUtf8Coder.of());
+
+      WriteFilesResult filesResult =
+          (WriteFilesResult)
+              mappedUserData.apply(
+                  "Write files to specified location",
+                  FileIO.write()
+                      .via((FileIO.Sink) new CSVSink())
+                      .to(stagingBucketDir)
+                      .withPrefix(getFileNameTemplate())
+                      .withSuffix(".csv")
+                      .withPrefix(UUID.randomUUID().toString().subSequence(0, 8).toString())
+                      .withCompression(Compression.GZIP));
+
+      return (PCollection)
+          filesResult
+              .getPerDestinationOutputFilenames()
+              .apply("Parse KV filenames to Strings", ParDo.of(new Parse()));
+    }
+
+    private ParDo.SingleOutput<Object, Object> copyToTable(
+        SnowflakeService snowflakeService, String stagingBucketDir) {
+      return ParDo.of(
+          new CopyToTableFn<>(
+              getDataSourceProviderFn(),
+              getTable(),
+              getQuery(),
+              stagingBucketDir,
+              getLocation(),
+              getWriteDisposition(),
+              snowflakeService));
+    }
+  }
+
+  public static class Concatenate extends Combine.CombineFn<String, List<String>, List<String>> {
+    @Override
+    public List<String> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<String> addInput(List<String> mutableAccumulator, String input) {
+      mutableAccumulator.add(String.format("'%s'", input));
+      return mutableAccumulator;
+    }
+
+    @Override
+    public List<String> mergeAccumulators(Iterable<List<String>> accumulators) {
+      List<String> result = createAccumulator();
+      for (List<String> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<String> extractOutput(List<String> accumulator) {
+      return accumulator;
+    }
+  }
+
+  private static class MapUserDataObjectsArrayFn<T> extends DoFn<T, Object[]> {

Review comment:
       Thanks, I changed to `MapElements.via` with `SimpleFunction`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow edited a comment on pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow edited a comment on pull request #11794:
URL: https://github.com/apache/beam/pull/11794#issuecomment-642150480


   Hey @RyanSkraba , We answered for all your comments. Will you find a moment to check again ? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432412298



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/Location.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.Serializable;
+
+/**
+ * Used as one of the arguments for {@link org.apache.beam.sdk.io.snowflake.SnowflakeIO} write and
+ * read operations. It keeps information about storage integration and staging bucket name.
+ * Integration name is Snowflake storage integration object created according to Snowflake
+ * documentation for the GCS bucket. Staging bucket name is Google Cloud Storage bucket which in the
+ * case of writing operation will be used to save CSV files which will end up in Snowflake under
+ * “staging_bucket_name/data” path and in the case of reading operation will be used as a temporary
+ * location for storing CSV files named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` which will be removed
+ * automatically once Read operation finishes.
+ */
+public class Location implements Serializable {
+  private String storageIntegrationName;

Review comment:
       It makes sense. I added `@Nullable `. Thanks a lot :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r433739364



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/CSVSink.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.util.List;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+
+/** Implementation of {@link org.apache.beam.sdk.io.FileIO.Sink} for writing CSV. */
+public class CSVSink implements FileIO.Sink<String> {

Review comment:
       In one of the PR's , we're planing to add `create dispositions` for tables  which is connected more with schema concept. Let's talk about it then :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] RyanSkraba commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
RyanSkraba commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r431912755



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");
+      checkArgument(
+          loc.getStagingBucketName() != null, "location with stagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection write(PCollection input, String stagingBucketDir) {
+      SnowflakeService snowflakeService =
+          getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeServiceImpl();
+
+      PCollection files = writeFiles(input, stagingBucketDir);
+
+      files =
+          (PCollection)
+              files.apply("Create list of files to copy", Combine.globally(new Concatenate()));
+
+      return (PCollection)
+          files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+    }
+
+    private PCollection writeFiles(PCollection<T> input, String stagingBucketDir) {
+      class Parse extends DoFn<KV<T, String>, String> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          c.output(c.element().getValue());
+        }
+      }
+
+      PCollection mappedUserData =
+          input
+              .apply(
+                  "Map user data to Objects array",
+                  ParDo.of(new MapUserDataObjectsArrayFn<T>(getUserDataMapper())))
+              .apply("Map Objects array to CSV lines", ParDo.of(new MapObjectsArrayToCsvFn()))
+              .setCoder(StringUtf8Coder.of());
+
+      WriteFilesResult filesResult =
+          (WriteFilesResult)
+              mappedUserData.apply(
+                  "Write files to specified location",
+                  FileIO.write()
+                      .via((FileIO.Sink) new CSVSink())
+                      .to(stagingBucketDir)
+                      .withPrefix(getFileNameTemplate())
+                      .withSuffix(".csv")
+                      .withPrefix(UUID.randomUUID().toString().subSequence(0, 8).toString())
+                      .withCompression(Compression.GZIP));
+
+      return (PCollection)
+          filesResult
+              .getPerDestinationOutputFilenames()
+              .apply("Parse KV filenames to Strings", ParDo.of(new Parse()));
+    }
+
+    private ParDo.SingleOutput<Object, Object> copyToTable(
+        SnowflakeService snowflakeService, String stagingBucketDir) {
+      return ParDo.of(
+          new CopyToTableFn<>(
+              getDataSourceProviderFn(),
+              getTable(),
+              getQuery(),
+              stagingBucketDir,
+              getLocation(),
+              getWriteDisposition(),
+              snowflakeService));
+    }
+  }
+
+  public static class Concatenate extends Combine.CombineFn<String, List<String>, List<String>> {
+    @Override
+    public List<String> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<String> addInput(List<String> mutableAccumulator, String input) {
+      mutableAccumulator.add(String.format("'%s'", input));
+      return mutableAccumulator;
+    }
+
+    @Override
+    public List<String> mergeAccumulators(Iterable<List<String>> accumulators) {
+      List<String> result = createAccumulator();
+      for (List<String> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<String> extractOutput(List<String> accumulator) {
+      return accumulator;
+    }
+  }
+
+  private static class MapUserDataObjectsArrayFn<T> extends DoFn<T, Object[]> {
+    private final UserDataMapper<T> csvMapper;
+
+    public MapUserDataObjectsArrayFn(UserDataMapper<T> csvMapper) {
+      this.csvMapper = csvMapper;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) throws Exception {
+      context.output(csvMapper.mapRow(context.element()));
+    }
+  }
+
+  /**
+   * Custom DoFn that maps {@link Object[]} into CSV line to be saved to Snowflake.
+   *
+   * <p>Adds Snowflake-specific quotations around strings.
+   */
+  private static class MapObjectsArrayToCsvFn extends DoFn<Object[], String> {
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      List<Object> csvItems = new ArrayList<>();
+      for (Object o : context.element()) {
+        if (o instanceof String) {
+          String field = (String) o;
+          field = field.replace("'", "''");
+          field = quoteField(field);
+
+          csvItems.add(field);
+        } else {
+          csvItems.add(o);

Review comment:
       Ugh, found a reference: https://docs.snowflake.com/en/user-guide/data-load-considerations-prepare.html#preparing-delimited-text-files
   
   > Fields that contain delimiter characters should be enclosed in quotes (single or double). If the data contains single or double quotes, then those quotes must be escaped.
   
   Can you confirm that this means `'I can''t believe it.'` or `'I can\'t believe it'` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r433050696



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");
+      checkArgument(
+          loc.getStagingBucketName() != null, "location with stagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection write(PCollection input, String stagingBucketDir) {

Review comment:
       I wouldn't say that it doesn't add any value being like this as in my opinion makes code more readable.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432427386



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/Location.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.Serializable;
+
+/**
+ * Used as one of the arguments for {@link org.apache.beam.sdk.io.snowflake.SnowflakeIO} write and
+ * read operations. It keeps information about storage integration and staging bucket name.
+ * Integration name is Snowflake storage integration object created according to Snowflake
+ * documentation for the GCS bucket. Staging bucket name is Google Cloud Storage bucket which in the
+ * case of writing operation will be used to save CSV files which will end up in Snowflake under
+ * “staging_bucket_name/data” path and in the case of reading operation will be used as a temporary
+ * location for storing CSV files named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` which will be removed
+ * automatically once Read operation finishes.
+ */
+public class Location implements Serializable {
+  private String storageIntegrationName;

Review comment:
       Both `storageIntegrationName ` and `stagingBucketName` are always  required. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on pull request #11794:
URL: https://github.com/apache/beam/pull/11794#issuecomment-647995341


   @RyanSkraba Thanks a lot  for your CR. 
   
   We're going to add your CSV concern to our feature improvement list.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r430612172



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/CSVSink.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.util.List;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+
+/** Implementation of {@link org.apache.beam.sdk.io.FileIO.Sink} for writing CSV. */
+public class CSVSink implements FileIO.Sink<String> {

Review comment:
       I took only a very superficial look. Are you using CSV file imports to write to Snowflake? I would be concerned about types being properly encoded in the CSV (big integers, floats, bytes, date/time types, strings with newlines/quotes). Have you already considered how to deal with all types?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432412298



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/Location.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.Serializable;
+
+/**
+ * Used as one of the arguments for {@link org.apache.beam.sdk.io.snowflake.SnowflakeIO} write and
+ * read operations. It keeps information about storage integration and staging bucket name.
+ * Integration name is Snowflake storage integration object created according to Snowflake
+ * documentation for the GCS bucket. Staging bucket name is Google Cloud Storage bucket which in the
+ * case of writing operation will be used to save CSV files which will end up in Snowflake under
+ * “staging_bucket_name/data” path and in the case of reading operation will be used as a temporary
+ * location for storing CSV files named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` which will be removed
+ * automatically once Read operation finishes.
+ */
+public class Location implements Serializable {
+  private String storageIntegrationName;

Review comment:
       It makes sense. I added `@Nullable `. Thanks a lot :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r444031088



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +513,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getStorageIntegrationName();
+
+    @Nullable
+    abstract String getStagingBucketName();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setStorageIntegrationName(String storageIntegrationName);
+
+      abstract Builder<T> setStagingBucketName(String stagingBucketName);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> withTable(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * Name of the cloud bucket (GCS by now) to use as tmp location of CSVs during COPY statement.
+     *
+     * @param stagingBucketName - String with the name of the bucket.
+     */
+    public Write<T> withStagingBucketName(String stagingBucketName) {
+      return toBuilder().setStagingBucketName(stagingBucketName).build();
+    }
+
+    /**
+     * Name of the Storage Integration in Snowflake to be used. See
+     * https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html for
+     * reference.
+     *
+     * @param integrationName - String with the name of the Storage Integration.
+     */
+    public Write<T> withStorageIntegrationName(String integrationName) {
+      return toBuilder().setStorageIntegrationName(integrationName).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      checkArguments();
+
+      String stagingBucketDir = String.format("%s/%s/", getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection<String> out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments() {
+      checkArgument(getStagingBucketName() != null, "withStagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection<String> write(PCollection<T> input, String stagingBucketDir) {
+      SnowflakeService snowflakeService =
+          getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeServiceImpl();
+
+      PCollection<String> files = writeFiles(input, stagingBucketDir);
+
+      // Combining PCollection of files as a side input into one list of files
+      ListCoder<String> coder = ListCoder.of(StringUtf8Coder.of());
+      files =
+          (PCollection)
+              files
+                  .getPipeline()
+                  .apply(
+                      Reify.viewInGlobalWindow(
+                          (PCollectionView) files.apply(View.asList()), coder));
+
+      return (PCollection)
+          files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+    }
+
+    private PCollection<String> writeFiles(PCollection<T> input, String stagingBucketDir) {
+
+      PCollection<String> mappedUserData =
+          input
+              .apply(
+                  MapElements.via(
+                      new SimpleFunction<T, Object[]>() {
+                        @Override
+                        public Object[] apply(T element) {
+                          return getUserDataMapper().mapRow(element);
+                        }
+                      }))
+              .apply("Map Objects array to CSV lines", ParDo.of(new MapObjectsArrayToCsvFn()))
+              .setCoder(StringUtf8Coder.of());
+
+      WriteFilesResult filesResult =
+          mappedUserData.apply(
+              "Write files to specified location",
+              FileIO.<String>write()
+                  .via(TextIO.sink())
+                  .to(stagingBucketDir)
+                  .withPrefix(getFileNameTemplate())
+                  .withSuffix(".csv")
+                  .withCompression(Compression.GZIP));
+
+      return (PCollection)
+          filesResult
+              .getPerDestinationOutputFilenames()
+              .apply("Parse KV filenames to Strings", Values.<String>create());
+    }
+
+    private ParDo.SingleOutput<Object, Object> copyToTable(
+        SnowflakeService snowflakeService, String stagingBucketDir) {
+      return ParDo.of(
+          new CopyToTableFn<>(
+              getDataSourceProviderFn(),
+              getTable(),
+              getQuery(),
+              stagingBucketDir,
+              getStorageIntegrationName(),
+              getWriteDisposition(),
+              snowflakeService));
+    }
+  }
+
+  public static class Concatenate extends Combine.CombineFn<String, List<String>, List<String>> {
+    @Override
+    public List<String> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<String> addInput(List<String> mutableAccumulator, String input) {
+      mutableAccumulator.add(String.format("'%s'", input));
+      return mutableAccumulator;
+    }
+
+    @Override
+    public List<String> mergeAccumulators(Iterable<List<String>> accumulators) {
+      List<String> result = createAccumulator();
+      for (List<String> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<String> extractOutput(List<String> accumulator) {
+      return accumulator;
+    }
+  }
+
+  /**
+   * Custom DoFn that maps {@link Object[]} into CSV line to be saved to Snowflake.
+   *
+   * <p>Adds Snowflake-specific quotations around strings.
+   */
+  private static class MapObjectsArrayToCsvFn extends DoFn<Object[], String> {
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      List<Object> csvItems = new ArrayList<>();
+      for (Object o : context.element()) {
+        if (o instanceof String) {
+          String field = (String) o;
+          field = field.replace("'", "''");
+          field = quoteField(field);
+
+          csvItems.add(field);
+        } else {
+          csvItems.add(o);
+        }
+      }
+      context.output(Joiner.on(",").useForNull("").join(csvItems));

Review comment:
       @RyanSkraba thanks for pointing this. We're going to add this concern to our feature improvement list :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432407631



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/package-info.java
##########
@@ -0,0 +1,2 @@
+/** Snowflake IO services and POJOs. */

Review comment:
       Thanks for spotting this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432413521



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/Location.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.Serializable;
+
+/**
+ * Used as one of the arguments for {@link org.apache.beam.sdk.io.snowflake.SnowflakeIO} write and
+ * read operations. It keeps information about storage integration and staging bucket name.
+ * Integration name is Snowflake storage integration object created according to Snowflake
+ * documentation for the GCS bucket. Staging bucket name is Google Cloud Storage bucket which in the
+ * case of writing operation will be used to save CSV files which will end up in Snowflake under
+ * “staging_bucket_name/data” path and in the case of reading operation will be used as a temporary
+ * location for storing CSV files named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` which will be removed
+ * automatically once Read operation finishes.
+ */
+public class Location implements Serializable {
+  private String storageIntegrationName;
+  private String stagingBucketName;
+
+  public static Location of(SnowflakePipelineOptions options) {
+    return new Location(options.getStorageIntegrationName(), options.getStagingBucketName());
+  }
+
+  public static Location of(String storageIntegrationName, String stagingBucketName) {

Review comment:
       I added removed link https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html . Thanks 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] RyanSkraba commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
RyanSkraba commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r430983300



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -172,10 +230,7 @@
     abstract String getTable();
 
     @Nullable
-    abstract String getIntegrationName();

Review comment:
       Not backwards compatible on a non-`@Experimental` class.  Some of these would be easy to implement on top of `Location` with a  `@Deprecate` tag? 
   
   (My personal opinion would be not to worry too much about it... put `@Experimental` on the SnowflakeIO?)
   
   That being said, refactoring the two into a `Location` POJO doesn't really add a LOT of value and it might be more worthwhile to put it back to two separate configs.  Up to you!

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/Location.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.Serializable;
+
+/**
+ * Used as one of the arguments for {@link org.apache.beam.sdk.io.snowflake.SnowflakeIO} write and
+ * read operations. It keeps information about storage integration and staging bucket name.
+ * Integration name is Snowflake storage integration object created according to Snowflake
+ * documentation for the GCS bucket. Staging bucket name is Google Cloud Storage bucket which in the
+ * case of writing operation will be used to save CSV files which will end up in Snowflake under
+ * “staging_bucket_name/data” path and in the case of reading operation will be used as a temporary
+ * location for storing CSV files named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` which will be removed
+ * automatically once Read operation finishes.
+ */
+public class Location implements Serializable {
+  private String storageIntegrationName;
+  private String stagingBucketName;
+
+  public static Location of(SnowflakePipelineOptions options) {
+    return new Location(options.getStorageIntegrationName(), options.getStagingBucketName());
+  }
+
+  public static Location of(String storageIntegrationName, String stagingBucketName) {

Review comment:
       The original `withStagingBucketName` and `withIntegrationName` had some useful javadoc and links!

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/CSVSink.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.util.List;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+
+/** Implementation of {@link org.apache.beam.sdk.io.FileIO.Sink} for writing CSV. */
+public class CSVSink implements FileIO.Sink<String> {

Review comment:
       Just a little nitpick!  This class really has nothing to do with CSV, other than writing a header line.  If you're going to add a schema (orient it towards Beam Row for example), you might want to swap it out for an Avro or Parquet file storage!
   
   This would be a big change.  If you were to make this class private to your implementation, do you think it would be possible to swap it out for a schema-capable format transparently later?

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/package-info.java
##########
@@ -0,0 +1,2 @@
+/** Snowflake IO services and POJOs. */

Review comment:
       ```suggestion
   /*
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *     http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
   
   /** Snowflake IO services and POJOs. */
   ```

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -339,10 +360,31 @@
       emptyCollection
           .apply(Wait.on(output))
           .apply(ParDo.of(new CleanTmpFilesFromGcsFn(stagingBucketDir)));
-
       return output;
     }
 
+    private void checkArguments(Location loc) {
+      // Either table or query is required. If query is present, it's being used, table is used
+      // otherwise
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,

Review comment:
       Hello!  Refactoring this part out into a separate method is odd -- the `loc` configuration is passed as a parameter while all the rest of the configurations are accessed as methods.  Were you going to reuse the location check at some point? As it is, it's inconsistent.
   
   Also, storageIntegrationName must not be null.  Is that true?  Later, it's checked against null in `SnowflakeServiceImpl`

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");

Review comment:
       Likewise -- can this be nullable?

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");
+      checkArgument(
+          loc.getStagingBucketName() != null, "location with stagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection write(PCollection input, String stagingBucketDir) {
+      SnowflakeService snowflakeService =
+          getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeServiceImpl();
+
+      PCollection files = writeFiles(input, stagingBucketDir);
+
+      files =
+          (PCollection)
+              files.apply("Create list of files to copy", Combine.globally(new Concatenate()));
+
+      return (PCollection)
+          files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+    }
+
+    private PCollection writeFiles(PCollection<T> input, String stagingBucketDir) {
+      class Parse extends DoFn<KV<T, String>, String> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          c.output(c.element().getValue());
+        }
+      }
+
+      PCollection mappedUserData =

Review comment:
       ```suggestion
         PCollection<String> mappedUserData =
   ```
   Avoid raw types where possible!  This should be true throughout the code.

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakePipelineOptions.java
##########
@@ -111,23 +111,13 @@
 
   void setLoginTimeout(String loginTimeout);
 
-  @Description("External location name to connect to.")
-  String getExternalLocation();
-
-  void setExternalLocation(String externalLocation);
-
-  @Description("Temporary GCS bucket name")
+  @Description("Temporary GCS bucket name.")
   String getStagingBucketName();
 
   void setStagingBucketName(String stagingBucketName);
 
-  @Description("Storage integration - required in case the external stage is not specified.")
-  String getStorageIntegration();
-
-  void setStorageIntegration(String integration);
-
-  @Description("Stage name. Optional.")
-  String getStage();
+  @Description("Storage integration name")
+  String getStorageIntegrationName();

Review comment:
       Just to be sure you're aware!  This breaks users that are passing this information from the command line using the `--stage` option.

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");
+      checkArgument(
+          loc.getStagingBucketName() != null, "location with stagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection write(PCollection input, String stagingBucketDir) {
+      SnowflakeService snowflakeService =
+          getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeServiceImpl();
+
+      PCollection files = writeFiles(input, stagingBucketDir);
+
+      files =
+          (PCollection)
+              files.apply("Create list of files to copy", Combine.globally(new Concatenate()));
+
+      return (PCollection)
+          files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+    }
+
+    private PCollection writeFiles(PCollection<T> input, String stagingBucketDir) {
+      class Parse extends DoFn<KV<T, String>, String> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          c.output(c.element().getValue());
+        }
+      }
+
+      PCollection mappedUserData =
+          input
+              .apply(
+                  "Map user data to Objects array",
+                  ParDo.of(new MapUserDataObjectsArrayFn<T>(getUserDataMapper())))
+              .apply("Map Objects array to CSV lines", ParDo.of(new MapObjectsArrayToCsvFn()))
+              .setCoder(StringUtf8Coder.of());
+
+      WriteFilesResult filesResult =
+          (WriteFilesResult)
+              mappedUserData.apply(
+                  "Write files to specified location",
+                  FileIO.write()
+                      .via((FileIO.Sink) new CSVSink())
+                      .to(stagingBucketDir)
+                      .withPrefix(getFileNameTemplate())
+                      .withSuffix(".csv")
+                      .withPrefix(UUID.randomUUID().toString().subSequence(0, 8).toString())
+                      .withCompression(Compression.GZIP));
+
+      return (PCollection)
+          filesResult
+              .getPerDestinationOutputFilenames()
+              .apply("Parse KV filenames to Strings", ParDo.of(new Parse()));
+    }
+
+    private ParDo.SingleOutput<Object, Object> copyToTable(
+        SnowflakeService snowflakeService, String stagingBucketDir) {
+      return ParDo.of(
+          new CopyToTableFn<>(
+              getDataSourceProviderFn(),
+              getTable(),
+              getQuery(),
+              stagingBucketDir,
+              getLocation(),
+              getWriteDisposition(),
+              snowflakeService));
+    }
+  }
+
+  public static class Concatenate extends Combine.CombineFn<String, List<String>, List<String>> {
+    @Override
+    public List<String> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<String> addInput(List<String> mutableAccumulator, String input) {
+      mutableAccumulator.add(String.format("'%s'", input));
+      return mutableAccumulator;
+    }
+
+    @Override
+    public List<String> mergeAccumulators(Iterable<List<String>> accumulators) {
+      List<String> result = createAccumulator();
+      for (List<String> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<String> extractOutput(List<String> accumulator) {
+      return accumulator;
+    }
+  }
+
+  private static class MapUserDataObjectsArrayFn<T> extends DoFn<T, Object[]> {
+    private final UserDataMapper<T> csvMapper;
+
+    public MapUserDataObjectsArrayFn(UserDataMapper<T> csvMapper) {
+      this.csvMapper = csvMapper;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) throws Exception {
+      context.output(csvMapper.mapRow(context.element()));
+    }
+  }
+
+  /**
+   * Custom DoFn that maps {@link Object[]} into CSV line to be saved to Snowflake.
+   *
+   * <p>Adds Snowflake-specific quotations around strings.
+   */
+  private static class MapObjectsArrayToCsvFn extends DoFn<Object[], String> {
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      List<Object> csvItems = new ArrayList<>();
+      for (Object o : context.element()) {
+        if (o instanceof String) {
+          String field = (String) o;
+          field = field.replace("'", "''");
+          field = quoteField(field);
+
+          csvItems.add(field);
+        } else {
+          csvItems.add(o);

Review comment:
       Oooooooo -- relying on the `toString()` representation of a Java instance is _almost_ always dangerous...  At this point, any non-String Object that contains a `'` or `,` is going to break the current line.
   
   Is there any Snowflake doc on the CSV format they're using?  All I could find was related to valid character encodings.

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/Location.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.Serializable;
+
+/**
+ * Used as one of the arguments for {@link org.apache.beam.sdk.io.snowflake.SnowflakeIO} write and
+ * read operations. It keeps information about storage integration and staging bucket name.
+ * Integration name is Snowflake storage integration object created according to Snowflake
+ * documentation for the GCS bucket. Staging bucket name is Google Cloud Storage bucket which in the
+ * case of writing operation will be used to save CSV files which will end up in Snowflake under
+ * “staging_bucket_name/data” path and in the case of reading operation will be used as a temporary
+ * location for storing CSV files named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` which will be removed
+ * automatically once Read operation finishes.
+ */
+public class Location implements Serializable {
+  private String storageIntegrationName;

Review comment:
       It's not mandatory, but I've gotten used to seeing `@Nullable` annotations in Beam code!  This looks like it can be valid when null, but staging bucket doesn't.

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -257,23 +301,12 @@
     }
 
     /**
-     * Name of the cloud bucket (GCS by now) to use as tmp location of CSVs during COPY statement.
-     *
-     * @param stagingBucketName - String with the name of the bucket.
-     */
-    public Read<T> withStagingBucketName(String stagingBucketName) {
-      return toBuilder().setStagingBucketName(stagingBucketName).build();
-    }
-
-    /**
-     * Name of the Storage Integration in Snowflake to be used. See
-     * https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html for
-     * reference.
+     * A location object which contains connection config between Snowflake and GCP.
      *
-     * @param integrationName - String with the name of the Storage Integration.
+     * @param location - an instance of {@link Location}.
      */
-    public Read<T> withIntegrationName(String integrationName) {
-      return toBuilder().setIntegrationName(integrationName).build();
+    public Read<T> via(Location location) {

Review comment:
       I know we use `via` elsewhere in Beam code, but mostly for callback functions -- any reason why this wouldn't just be `withLocation` ?

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/enums/package-info.java
##########
@@ -0,0 +1,2 @@
+/** Snowflake IO data types. */

Review comment:
       ```suggestion
   /*
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *     http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
   
   /** Snowflake IO data types. */
   ```

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");
+      checkArgument(
+          loc.getStagingBucketName() != null, "location with stagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");

Review comment:
       There's no `withTable()`  in Write, probably `to()`

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");
+      checkArgument(
+          loc.getStagingBucketName() != null, "location with stagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection write(PCollection input, String stagingBucketDir) {
+      SnowflakeService snowflakeService =
+          getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeServiceImpl();
+
+      PCollection files = writeFiles(input, stagingBucketDir);
+
+      files =
+          (PCollection)
+              files.apply("Create list of files to copy", Combine.globally(new Concatenate()));
+
+      return (PCollection)
+          files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+    }
+
+    private PCollection writeFiles(PCollection<T> input, String stagingBucketDir) {
+      class Parse extends DoFn<KV<T, String>, String> {

Review comment:
       It looks like you're re-implementing [Values.create()](https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/transforms/Values.html)!  Consider using the existing transform.

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");
+      checkArgument(
+          loc.getStagingBucketName() != null, "location with stagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection write(PCollection input, String stagingBucketDir) {
+      SnowflakeService snowflakeService =
+          getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeServiceImpl();
+
+      PCollection files = writeFiles(input, stagingBucketDir);
+
+      files =
+          (PCollection)
+              files.apply("Create list of files to copy", Combine.globally(new Concatenate()));
+
+      return (PCollection)
+          files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+    }
+
+    private PCollection writeFiles(PCollection<T> input, String stagingBucketDir) {
+      class Parse extends DoFn<KV<T, String>, String> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          c.output(c.element().getValue());
+        }
+      }
+
+      PCollection mappedUserData =
+          input
+              .apply(
+                  "Map user data to Objects array",
+                  ParDo.of(new MapUserDataObjectsArrayFn<T>(getUserDataMapper())))
+              .apply("Map Objects array to CSV lines", ParDo.of(new MapObjectsArrayToCsvFn()))
+              .setCoder(StringUtf8Coder.of());
+
+      WriteFilesResult filesResult =
+          (WriteFilesResult)
+              mappedUserData.apply(
+                  "Write files to specified location",
+                  FileIO.write()
+                      .via((FileIO.Sink) new CSVSink())
+                      .to(stagingBucketDir)
+                      .withPrefix(getFileNameTemplate())
+                      .withSuffix(".csv")
+                      .withPrefix(UUID.randomUUID().toString().subSequence(0, 8).toString())
+                      .withCompression(Compression.GZIP));
+
+      return (PCollection)
+          filesResult
+              .getPerDestinationOutputFilenames()
+              .apply("Parse KV filenames to Strings", ParDo.of(new Parse()));
+    }
+
+    private ParDo.SingleOutput<Object, Object> copyToTable(
+        SnowflakeService snowflakeService, String stagingBucketDir) {
+      return ParDo.of(
+          new CopyToTableFn<>(
+              getDataSourceProviderFn(),
+              getTable(),
+              getQuery(),
+              stagingBucketDir,
+              getLocation(),
+              getWriteDisposition(),
+              snowflakeService));
+    }
+  }
+
+  public static class Concatenate extends Combine.CombineFn<String, List<String>, List<String>> {

Review comment:
       Hmmmm -- a common pattern to process a small list of results is to treat it as a sideInput instead of collecting it into a singleton collection.  I have no really strong opinion about this, but someone might want to weigh in!

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");
+      checkArgument(
+          loc.getStagingBucketName() != null, "location with stagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection write(PCollection input, String stagingBucketDir) {

Review comment:
       ```suggestion
       private PCollection write(PCollection<T> input, String stagingBucketDir) {
   ```
   I'd consider collapsing this method into `expand` (the caller) or combining with `withFiles` below.  It doesn't add much by itself.

##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");
+      checkArgument(
+          loc.getStagingBucketName() != null, "location with stagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection write(PCollection input, String stagingBucketDir) {
+      SnowflakeService snowflakeService =
+          getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeServiceImpl();
+
+      PCollection files = writeFiles(input, stagingBucketDir);
+
+      files =
+          (PCollection)
+              files.apply("Create list of files to copy", Combine.globally(new Concatenate()));
+
+      return (PCollection)
+          files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+    }
+
+    private PCollection writeFiles(PCollection<T> input, String stagingBucketDir) {
+      class Parse extends DoFn<KV<T, String>, String> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          c.output(c.element().getValue());
+        }
+      }
+
+      PCollection mappedUserData =
+          input
+              .apply(
+                  "Map user data to Objects array",
+                  ParDo.of(new MapUserDataObjectsArrayFn<T>(getUserDataMapper())))
+              .apply("Map Objects array to CSV lines", ParDo.of(new MapObjectsArrayToCsvFn()))
+              .setCoder(StringUtf8Coder.of());
+
+      WriteFilesResult filesResult =
+          (WriteFilesResult)
+              mappedUserData.apply(
+                  "Write files to specified location",
+                  FileIO.write()
+                      .via((FileIO.Sink) new CSVSink())
+                      .to(stagingBucketDir)
+                      .withPrefix(getFileNameTemplate())
+                      .withSuffix(".csv")
+                      .withPrefix(UUID.randomUUID().toString().subSequence(0, 8).toString())
+                      .withCompression(Compression.GZIP));
+
+      return (PCollection)
+          filesResult
+              .getPerDestinationOutputFilenames()
+              .apply("Parse KV filenames to Strings", ParDo.of(new Parse()));
+    }
+
+    private ParDo.SingleOutput<Object, Object> copyToTable(
+        SnowflakeService snowflakeService, String stagingBucketDir) {
+      return ParDo.of(
+          new CopyToTableFn<>(
+              getDataSourceProviderFn(),
+              getTable(),
+              getQuery(),
+              stagingBucketDir,
+              getLocation(),
+              getWriteDisposition(),
+              snowflakeService));
+    }
+  }
+
+  public static class Concatenate extends Combine.CombineFn<String, List<String>, List<String>> {
+    @Override
+    public List<String> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<String> addInput(List<String> mutableAccumulator, String input) {
+      mutableAccumulator.add(String.format("'%s'", input));
+      return mutableAccumulator;
+    }
+
+    @Override
+    public List<String> mergeAccumulators(Iterable<List<String>> accumulators) {
+      List<String> result = createAccumulator();
+      for (List<String> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<String> extractOutput(List<String> accumulator) {
+      return accumulator;
+    }
+  }
+
+  private static class MapUserDataObjectsArrayFn<T> extends DoFn<T, Object[]> {

Review comment:
       Consider reusing a `MapElements.via(t -> csvMapper.mapRow(t))` instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kkucharc commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
kkucharc commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r438289586



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");
+      checkArgument(
+          loc.getStagingBucketName() != null, "location with stagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection write(PCollection input, String stagingBucketDir) {
+      SnowflakeService snowflakeService =
+          getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeServiceImpl();
+
+      PCollection files = writeFiles(input, stagingBucketDir);
+
+      files =
+          (PCollection)
+              files.apply("Create list of files to copy", Combine.globally(new Concatenate()));
+
+      return (PCollection)
+          files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+    }
+
+    private PCollection writeFiles(PCollection<T> input, String stagingBucketDir) {
+      class Parse extends DoFn<KV<T, String>, String> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          c.output(c.element().getValue());
+        }
+      }
+
+      PCollection mappedUserData =
+          input
+              .apply(
+                  "Map user data to Objects array",
+                  ParDo.of(new MapUserDataObjectsArrayFn<T>(getUserDataMapper())))
+              .apply("Map Objects array to CSV lines", ParDo.of(new MapObjectsArrayToCsvFn()))
+              .setCoder(StringUtf8Coder.of());
+
+      WriteFilesResult filesResult =
+          (WriteFilesResult)
+              mappedUserData.apply(
+                  "Write files to specified location",
+                  FileIO.write()
+                      .via((FileIO.Sink) new CSVSink())
+                      .to(stagingBucketDir)
+                      .withPrefix(getFileNameTemplate())
+                      .withSuffix(".csv")
+                      .withPrefix(UUID.randomUUID().toString().subSequence(0, 8).toString())
+                      .withCompression(Compression.GZIP));
+
+      return (PCollection)
+          filesResult
+              .getPerDestinationOutputFilenames()
+              .apply("Parse KV filenames to Strings", ParDo.of(new Parse()));
+    }
+
+    private ParDo.SingleOutput<Object, Object> copyToTable(
+        SnowflakeService snowflakeService, String stagingBucketDir) {
+      return ParDo.of(
+          new CopyToTableFn<>(
+              getDataSourceProviderFn(),
+              getTable(),
+              getQuery(),
+              stagingBucketDir,
+              getLocation(),
+              getWriteDisposition(),
+              snowflakeService));
+    }
+  }
+
+  public static class Concatenate extends Combine.CombineFn<String, List<String>, List<String>> {

Review comment:
       Thank you @RyanSkraba - it was challenging but I think I managed to change it with `Reify` :) After testing it seems working as expected. Could you check is solution that you expected?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432456811



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -339,10 +360,31 @@
       emptyCollection
           .apply(Wait.on(output))
           .apply(ParDo.of(new CleanTmpFilesFromGcsFn(stagingBucketDir)));
-
       return output;
     }
 
+    private void checkArguments(Location loc) {
+      // Either table or query is required. If query is present, it's being used, table is used
+      // otherwise
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,

Review comment:
       1. I think it is more readable with separate method but i agree with you that `is odd -- the loc configuration is passed as a parameter while all the rest of the configurations are accessed as methods.` that's why I changed `checkArguments` method to stop accepting location as argument. 
   
   2. `SnowflakeIO.Read ` requires `storageIntegrationName ` to be always set but `SnowflakeIO.Write ` doesn't require that.  The check against null in `SnowflakeServiceImpl` is corresponding to `SnowflakeIO.Write ` method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11794:
URL: https://github.com/apache/beam/pull/11794#issuecomment-636243814


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432407862



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/enums/package-info.java
##########
@@ -0,0 +1,2 @@
+/** Snowflake IO data types. */

Review comment:
       Thanks for spotting this 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kkucharc commented on pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
kkucharc commented on pull request #11794:
URL: https://github.com/apache/beam/pull/11794#issuecomment-648642561


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432409517



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakePipelineOptions.java
##########
@@ -111,23 +111,13 @@
 
   void setLoginTimeout(String loginTimeout);
 
-  @Description("External location name to connect to.")
-  String getExternalLocation();
-
-  void setExternalLocation(String externalLocation);
-
-  @Description("Temporary GCS bucket name")
+  @Description("Temporary GCS bucket name.")
   String getStagingBucketName();
 
   void setStagingBucketName(String stagingBucketName);
 
-  @Description("Storage integration - required in case the external stage is not specified.")
-  String getStorageIntegration();
-
-  void setStorageIntegration(String integration);
-
-  @Description("Stage name. Optional.")
-  String getStage();
+  @Description("Storage integration name")
+  String getStorageIntegrationName();

Review comment:
       Yes. I'm totally aware of that. --stage option is not being used anywhere in the codebase.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432429729



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");

Review comment:
       No it can't, I explained it in the above comment :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432453524



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/Location.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.Serializable;
+
+/**
+ * Used as one of the arguments for {@link org.apache.beam.sdk.io.snowflake.SnowflakeIO} write and
+ * read operations. It keeps information about storage integration and staging bucket name.
+ * Integration name is Snowflake storage integration object created according to Snowflake
+ * documentation for the GCS bucket. Staging bucket name is Google Cloud Storage bucket which in the
+ * case of writing operation will be used to save CSV files which will end up in Snowflake under
+ * “staging_bucket_name/data” path and in the case of reading operation will be used as a temporary
+ * location for storing CSV files named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` which will be removed
+ * automatically once Read operation finishes.
+ */
+public class Location implements Serializable {
+  private String storageIntegrationName;

Review comment:
       Yes, you're right. Thanks for spotting this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432437111



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -339,10 +360,31 @@
       emptyCollection
           .apply(Wait.on(output))
           .apply(ParDo.of(new CleanTmpFilesFromGcsFn(stagingBucketDir)));
-
       return output;
     }
 
+    private void checkArguments(Location loc) {
+      // Either table or query is required. If query is present, it's being used, table is used
+      // otherwise
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,

Review comment:
       I think keeping this part of code in separate method makes code more readable. 
   
   I agree with you about that `is odd -- the loc configuration is passed as a parameter while all the rest of the configurations are accessed as methods`
   
   I changed `checkArguments` method  to stop  accepting location as argument. 
   
   WDYT?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r444055571



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +513,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getStorageIntegrationName();
+
+    @Nullable
+    abstract String getStagingBucketName();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setStorageIntegrationName(String storageIntegrationName);
+
+      abstract Builder<T> setStagingBucketName(String stagingBucketName);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> withTable(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * Name of the cloud bucket (GCS by now) to use as tmp location of CSVs during COPY statement.
+     *
+     * @param stagingBucketName - String with the name of the bucket.
+     */
+    public Write<T> withStagingBucketName(String stagingBucketName) {
+      return toBuilder().setStagingBucketName(stagingBucketName).build();
+    }
+
+    /**
+     * Name of the Storage Integration in Snowflake to be used. See
+     * https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html for
+     * reference.
+     *
+     * @param integrationName - String with the name of the Storage Integration.
+     */
+    public Write<T> withStorageIntegrationName(String integrationName) {
+      return toBuilder().setStorageIntegrationName(integrationName).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      checkArguments();
+
+      String stagingBucketDir = String.format("%s/%s/", getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection<String> out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments() {
+      checkArgument(getStagingBucketName() != null, "withStagingBucketName is required");
+
+      checkArgument(getUserDataMapper() != null, "withUserDataMapper() is required");
+
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      checkArgument(getTable() != null, "withTable() is required");
+    }
+
+    private PCollection<String> write(PCollection<T> input, String stagingBucketDir) {
+      SnowflakeService snowflakeService =
+          getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeServiceImpl();
+
+      PCollection<String> files = writeFiles(input, stagingBucketDir);
+
+      // Combining PCollection of files as a side input into one list of files
+      ListCoder<String> coder = ListCoder.of(StringUtf8Coder.of());
+      files =
+          (PCollection)
+              files
+                  .getPipeline()
+                  .apply(
+                      Reify.viewInGlobalWindow(
+                          (PCollectionView) files.apply(View.asList()), coder));
+
+      return (PCollection)
+          files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+    }
+
+    private PCollection<String> writeFiles(PCollection<T> input, String stagingBucketDir) {
+
+      PCollection<String> mappedUserData =
+          input
+              .apply(
+                  MapElements.via(
+                      new SimpleFunction<T, Object[]>() {
+                        @Override
+                        public Object[] apply(T element) {
+                          return getUserDataMapper().mapRow(element);
+                        }
+                      }))
+              .apply("Map Objects array to CSV lines", ParDo.of(new MapObjectsArrayToCsvFn()))
+              .setCoder(StringUtf8Coder.of());
+
+      WriteFilesResult filesResult =
+          mappedUserData.apply(
+              "Write files to specified location",
+              FileIO.<String>write()
+                  .via(TextIO.sink())
+                  .to(stagingBucketDir)
+                  .withPrefix(getFileNameTemplate())
+                  .withSuffix(".csv")
+                  .withCompression(Compression.GZIP));
+
+      return (PCollection)
+          filesResult
+              .getPerDestinationOutputFilenames()
+              .apply("Parse KV filenames to Strings", Values.<String>create());
+    }
+
+    private ParDo.SingleOutput<Object, Object> copyToTable(
+        SnowflakeService snowflakeService, String stagingBucketDir) {
+      return ParDo.of(
+          new CopyToTableFn<>(
+              getDataSourceProviderFn(),
+              getTable(),
+              getQuery(),
+              stagingBucketDir,
+              getStorageIntegrationName(),
+              getWriteDisposition(),
+              snowflakeService));
+    }
+  }
+
+  public static class Concatenate extends Combine.CombineFn<String, List<String>, List<String>> {
+    @Override
+    public List<String> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<String> addInput(List<String> mutableAccumulator, String input) {
+      mutableAccumulator.add(String.format("'%s'", input));
+      return mutableAccumulator;
+    }
+
+    @Override
+    public List<String> mergeAccumulators(Iterable<List<String>> accumulators) {
+      List<String> result = createAccumulator();
+      for (List<String> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<String> extractOutput(List<String> accumulator) {
+      return accumulator;
+    }
+  }
+
+  /**
+   * Custom DoFn that maps {@link Object[]} into CSV line to be saved to Snowflake.
+   *
+   * <p>Adds Snowflake-specific quotations around strings.
+   */
+  private static class MapObjectsArrayToCsvFn extends DoFn<Object[], String> {
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      List<Object> csvItems = new ArrayList<>();
+      for (Object o : context.element()) {
+        if (o instanceof String) {
+          String field = (String) o;
+          field = field.replace("'", "''");
+          field = quoteField(field);
+
+          csvItems.add(field);
+        } else {
+          csvItems.add(o);
+        }
+      }
+      context.output(Joiner.on(",").useForNull("").join(csvItems));

Review comment:
       Thanks for pointing this. We're going to add it to our feature improvement list.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r430886885



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/CSVSink.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.util.List;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+
+/** Implementation of {@link org.apache.beam.sdk.io.FileIO.Sink} for writing CSV. */
+public class CSVSink implements FileIO.Sink<String> {

Review comment:
       Yes, we are using [COPY INTO table](https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#type-csv) with CSV format. 
   
   `SnowflakeIO.write `is constructed in a way that requires a particular table to exist in Snowflake before starting writing into Snowflake. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432429729



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -447,6 +494,346 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  /** Implementation of {@link #write()}. */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    abstract SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract String getQuery();
+
+    @Nullable
+    abstract Location getLocation();
+
+    @Nullable
+    abstract String getFileNameTemplate();
+
+    @Nullable
+    abstract WriteDisposition getWriteDisposition();
+
+    @Nullable
+    abstract UserDataMapper getUserDataMapper();
+
+    @Nullable
+    abstract SnowflakeService getSnowflakeService();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setLocation(Location location);
+
+      abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
+
+      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+
+      abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
+
+      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+
+      abstract Write<T> build();
+    }
+
+    /**
+     * Setting information about Snowflake server.
+     *
+     * @param config - An instance of {@link DataSourceConfiguration}.
+     */
+    public Write<T> withDataSourceConfiguration(final DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    /**
+     * Setting function that will provide {@link DataSourceConfiguration} in runtime.
+     *
+     * @param dataSourceProviderFn a {@link SerializableFunction}.
+     */
+    public Write<T> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    /**
+     * A table name to be written in Snowflake.
+     *
+     * @param table - String with the name of the table.
+     */
+    public Write<T> to(String table) {
+      return toBuilder().setTable(table).build();
+    }
+
+    /**
+     * A query to be executed in Snowflake.
+     *
+     * @param query - String with query.
+     */
+    public Write<T> withQueryTransformation(String query) {
+      return toBuilder().setQuery(query).build();
+    }
+
+    /**
+     * A location object which contains connection config between Snowflake and GCP.
+     *
+     * @param location - an instance of {@link Location}.
+     */
+    public Write<T> via(Location location) {
+      return toBuilder().setLocation(location).build();
+    }
+
+    /**
+     * A template name for files saved to GCP.
+     *
+     * @param fileNameTemplate - String with template name for files.
+     */
+    public Write<T> withFileNameTemplate(String fileNameTemplate) {
+      return toBuilder().setFileNameTemplate(fileNameTemplate).build();
+    }
+
+    /**
+     * User-defined function mapping user data into CSV lines.
+     *
+     * @param userDataMapper - an instance of {@link UserDataMapper}.
+     */
+    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+      return toBuilder().setUserDataMapper(userDataMapper).build();
+    }
+
+    /**
+     * A disposition to be used during writing to table phase.
+     *
+     * @param writeDisposition - an instance of {@link WriteDisposition}.
+     */
+    public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
+      return toBuilder().setWriteDisposition(writeDisposition).build();
+    }
+
+    /**
+     * A snowflake service which is supposed to be used. Note: Currently we have {@link
+     * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing.
+     *
+     * @param snowflakeService - an instance of {@link SnowflakeService}.
+     */
+    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
+      return toBuilder().setSnowflakeService(snowflakeService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      Location loc = getLocation();
+      checkArguments(loc);
+
+      String stagingBucketDir = String.format("%s/%s/", loc.getStagingBucketName(), WRITE_TMP_PATH);
+
+      PCollection out = write(input, stagingBucketDir);
+      out.setCoder(StringUtf8Coder.of());
+
+      return PDone.in(out.getPipeline());
+    }
+
+    private void checkArguments(Location loc) {
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,
+          "location with storageIntegrationName is required");

Review comment:
       No it can't, I explained it in the above comment :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432427386



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/Location.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.Serializable;
+
+/**
+ * Used as one of the arguments for {@link org.apache.beam.sdk.io.snowflake.SnowflakeIO} write and
+ * read operations. It keeps information about storage integration and staging bucket name.
+ * Integration name is Snowflake storage integration object created according to Snowflake
+ * documentation for the GCS bucket. Staging bucket name is Google Cloud Storage bucket which in the
+ * case of writing operation will be used to save CSV files which will end up in Snowflake under
+ * “staging_bucket_name/data” path and in the case of reading operation will be used as a temporary
+ * location for storing CSV files named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` which will be removed
+ * automatically once Read operation finishes.
+ */
+public class Location implements Serializable {
+  private String storageIntegrationName;

Review comment:
       Both `storageIntegrationName ` and `stagingBucketName` are always  required for connection between Snowflake and GCP. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r432437111



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -339,10 +360,31 @@
       emptyCollection
           .apply(Wait.on(output))
           .apply(ParDo.of(new CleanTmpFilesFromGcsFn(stagingBucketDir)));
-
       return output;
     }
 
+    private void checkArguments(Location loc) {
+      // Either table or query is required. If query is present, it's being used, table is used
+      // otherwise
+      checkArgument(loc != null, "via() is required");
+      checkArgument(
+          loc.getStorageIntegrationName() != null,

Review comment:
       I think keeping this part of code in separate method makes code more readable. 
   
   I agree with you about that `is odd -- the loc configuration is passed as a parameter while all the rest of the configurations are accessed as methods`
   
   I changed `checkArguments` method  to stop  accepting location as argument. WDYT?
   
   Yes , `storageIntegrationName` must not be null . Thanks for spotting this :) 
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kkucharc commented on pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
kkucharc commented on pull request #11794:
URL: https://github.com/apache/beam/pull/11794#issuecomment-648678063


   Thank you all a lot for your work and effort :) All tests passed - I'm merging this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on a change in pull request #11794:
URL: https://github.com/apache/beam/pull/11794#discussion_r430886885



##########
File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/CSVSink.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.util.List;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+
+/** Implementation of {@link org.apache.beam.sdk.io.FileIO.Sink} for writing CSV. */
+public class CSVSink implements FileIO.Sink<String> {

Review comment:
       Yes, we are using [COPY INTO table](https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#type-csv) with CSV format. 
   
   Currently, `SnowflakeIO.write `is constructed in a way that requires a particular table to exist in Snowflake before starting writing into Snowflake. 
   
   In one of the next Snowflake PR's we're planning to add the option for a user a possibility for passing table schema.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow edited a comment on pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow edited a comment on pull request #11794:
URL: https://github.com/apache/beam/pull/11794#issuecomment-642150480


   Hey @RyanSkraba , We answered fo all your comments. Will you find a moment to check again ? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] purbanow commented on pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK

Posted by GitBox <gi...@apache.org>.
purbanow commented on pull request #11794:
URL: https://github.com/apache/beam/pull/11794#issuecomment-632624409


   Hi @RyanSkraba will you find a moment to make a CR for this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org