You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/08/28 07:46:06 UTC

[beam] branch master updated: [BEAM-9898][BEAM-9899] SnowflakeIO.Write for cross-language with python wrapper and integration test (#12509)

This is an automated email from the ASF dual-hosted git repository.

mwalenia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 78d7636  [BEAM-9898][BEAM-9899] SnowflakeIO.Write for cross-language with python wrapper and integration test (#12509)
78d7636 is described below

commit 78d7636cac9234dbcc7145bb0330275c557c0010
Author: Piotr Szuberski <pi...@polidea.com>
AuthorDate: Fri Aug 28 09:45:24 2020 +0200

    [BEAM-9898][BEAM-9899] SnowflakeIO.Write for cross-language with python wrapper and integration test (#12509)
    
    Add:
    
    * External transform registrar for SnowflakeIO.Write for cross-language usage
    * Python wrapper for 1)
    * Integration test for SnowflakeIO python wrappers.
---
 .../io/snowflake/expansion-service/build.gradle    |   3 +-
 .../credentials/SnowflakeCredentialsFactory.java   |   4 +-
 ...ration.java => CrossLanguageConfiguration.java} |  20 +-
 ...nowflakeReadRegistrar.java => ReadBuilder.java} |  62 ++-
 .../crosslanguage/SnowflakeTransformRegistrar.java |  42 ++
 .../io/snowflake/crosslanguage/WriteBuilder.java   | 105 +++++
 .../sdk/io/snowflake/data/SnowflakeDataType.java   |  54 +++
 sdks/python/apache_beam/io/external/snowflake.py   | 204 ---------
 .../io/external/xlang_snowflakeio_it_test.py       | 269 ++++++++++++
 sdks/python/apache_beam/io/snowflake.py            | 474 +++++++++++++++++++++
 10 files changed, 991 insertions(+), 246 deletions(-)

diff --git a/sdks/java/io/snowflake/expansion-service/build.gradle b/sdks/java/io/snowflake/expansion-service/build.gradle
index 8a6ea6c..c6e14e1 100644
--- a/sdks/java/io/snowflake/expansion-service/build.gradle
+++ b/sdks/java/io/snowflake/expansion-service/build.gradle
@@ -20,7 +20,8 @@ apply plugin: 'org.apache.beam.module'
 apply plugin: 'application'
 mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"
 
-applyJavaNature(enableChecker:false,
+applyJavaNature(
+  enableChecker:true,
   automaticModuleName: 'org.apache.beam.sdk.io.expansion.service',
   exportJavadoc: false,
   validateShadowJar: false,
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java
index 2b45dc1..765f6dc 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.io.snowflake.credentials;
 
 import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions;
-import org.apache.beam.sdk.io.snowflake.crosslanguage.SnowflakeReadRegistrar;
+import org.apache.beam.sdk.io.snowflake.crosslanguage.CrossLanguageConfiguration;
 
 /**
  * Factory class for creating implementations of {@link SnowflakeCredentials} from {@link
@@ -38,7 +38,7 @@ public class SnowflakeCredentialsFactory {
     throw new RuntimeException("Can't get credentials from Options");
   }
 
-  public static SnowflakeCredentials of(SnowflakeReadRegistrar.ReadConfiguration c) {
+  public static SnowflakeCredentials of(CrossLanguageConfiguration c) {
     if (oauthOptionsAvailable(c.getOAuthToken())) {
       return new OAuthTokenSnowflakeCredentials(c.getOAuthToken());
     } else if (usernamePasswordOptionsAvailable(c.getUsername(), c.getPassword())) {
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/CrossLanguageConfiguration.java
similarity index 89%
rename from sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java
rename to sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/CrossLanguageConfiguration.java
index 38162ae..920f343 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/CrossLanguageConfiguration.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.io.snowflake.crosslanguage;
 
 /** Parameters abstract class to expose the transforms to an external SDK. */
-public abstract class Configuration {
+public abstract class CrossLanguageConfiguration {
   private String serverName;
   private String username;
   private String password;
@@ -27,6 +27,8 @@ public abstract class Configuration {
   private String oAuthToken;
   private String database;
   private String schema;
+  private String role;
+  private String warehouse;
   private String table;
   private String query;
   private String stagingBucketName;
@@ -96,6 +98,22 @@ public abstract class Configuration {
     this.schema = schema;
   }
 
+  public String getRole() {
+    return role;
+  }
+
+  public void setRole(String role) {
+    this.role = role;
+  }
+
+  public String getWarehouse() {
+    return warehouse;
+  }
+
+  public void setWarehouse(String warehouse) {
+    this.warehouse = warehouse;
+  }
+
   public String getTable() {
     return table;
   }
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/ReadBuilder.java
similarity index 51%
rename from sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java
rename to sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/ReadBuilder.java
index 1e7be0f..c811cf2 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/ReadBuilder.java
@@ -17,14 +17,12 @@
  */
 package org.apache.beam.sdk.io.snowflake.crosslanguage;
 
-import com.google.auto.service.AutoService;
 import java.io.Serializable;
 import java.nio.charset.Charset;
-import java.util.Map;
 import javax.sql.DataSource;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
 import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
 import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
 import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory;
@@ -33,47 +31,35 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
-/** Exposes {@link SnowflakeIO.Read} as an external transform for cross-language usage. */
-@Experimental
-@AutoService(ExternalTransformRegistrar.class)
-public final class SnowflakeReadRegistrar implements ExternalTransformRegistrar {
-
-  public static final String URN = "beam:external:java:snowflake:read:v1";
-
-  @Override
-  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
-    return ImmutableMap.of(URN, ReadBuilder.class);
-  }
+@Experimental(Kind.PORTABILITY)
+public class ReadBuilder
+    implements ExternalTransformBuilder<ReadBuilder.Configuration, PBegin, PCollection<byte[]>> {
 
   /** Parameters class to expose the transform to an external SDK. */
-  public static class ReadConfiguration extends Configuration {}
+  public static class Configuration extends CrossLanguageConfiguration {}
 
-  public static class ReadBuilder
-      implements ExternalTransformBuilder<ReadConfiguration, PBegin, PCollection<byte[]>> {
-    public ReadBuilder() {}
-
-    @Override
-    public PTransform<PBegin, PCollection<byte[]>> buildExternal(ReadConfiguration c) {
-      SnowflakeCredentials credentials = SnowflakeCredentialsFactory.of(c);
+  @Override
+  public PTransform<PBegin, PCollection<byte[]>> buildExternal(Configuration c) {
+    SnowflakeCredentials credentials = SnowflakeCredentialsFactory.of(c);
 
-      SerializableFunction<Void, DataSource> dataSourceSerializableFunction =
-          SnowflakeIO.DataSourceProviderFromDataSourceConfiguration.of(
-              SnowflakeIO.DataSourceConfiguration.create(credentials)
-                  .withServerName(c.getServerName())
-                  .withDatabase(c.getDatabase())
-                  .withSchema(c.getSchema()));
+    SerializableFunction<Void, DataSource> dataSourceSerializableFunction =
+        SnowflakeIO.DataSourceProviderFromDataSourceConfiguration.of(
+            SnowflakeIO.DataSourceConfiguration.create(credentials)
+                .withServerName(c.getServerName())
+                .withDatabase(c.getDatabase())
+                .withSchema(c.getSchema())
+                .withRole(c.getRole())
+                .withWarehouse(c.getWarehouse()));
 
-      return SnowflakeIO.<byte[]>read()
-          .withStorageIntegrationName(c.getStorageIntegrationName())
-          .withStagingBucketName(c.getStagingBucketName())
-          .withDataSourceProviderFn(dataSourceSerializableFunction)
-          .withCsvMapper(CsvMapper.getCsvMapper())
-          .withCoder(ByteArrayCoder.of())
-          .fromTable(c.getTable())
-          .fromQuery(c.getQuery());
-    }
+    return SnowflakeIO.<byte[]>read()
+        .withStorageIntegrationName(c.getStorageIntegrationName())
+        .withStagingBucketName(c.getStagingBucketName())
+        .withDataSourceProviderFn(dataSourceSerializableFunction)
+        .withCsvMapper(CsvMapper.getCsvMapper())
+        .withCoder(ByteArrayCoder.of())
+        .fromTable(c.getTable())
+        .fromQuery(c.getQuery());
   }
 
   private static class CsvMapper implements Serializable {
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java
new file mode 100644
index 0000000..e69219a
--- /dev/null
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.crosslanguage;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Exposes {@link SnowflakeIO.Read} and {@link SnowflakeIO.Write} as an external transform for
+ * cross-language usage.
+ */
+@Experimental(Kind.PORTABILITY)
+@AutoService(ExternalTransformRegistrar.class)
+public final class SnowflakeTransformRegistrar implements ExternalTransformRegistrar {
+  public static final String READ_URN = "beam:external:java:snowflake:read:v1";
+  public static final String WRITE_URN = "beam:external:java:snowflake:write:v1";
+
+  @Override
+  public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
+    return ImmutableMap.of(READ_URN, new ReadBuilder(), WRITE_URN, new WriteBuilder());
+  }
+}
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/WriteBuilder.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/WriteBuilder.java
new file mode 100644
index 0000000..9ca4c73
--- /dev/null
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/WriteBuilder.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.crosslanguage;
+
+import java.io.IOException;
+import java.util.List;
+import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory;
+import org.apache.beam.sdk.io.snowflake.data.SnowflakeTableSchema;
+import org.apache.beam.sdk.io.snowflake.enums.CreateDisposition;
+import org.apache.beam.sdk.io.snowflake.enums.WriteDisposition;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+@Experimental(Kind.PORTABILITY)
+public class WriteBuilder
+    implements ExternalTransformBuilder<WriteBuilder.Configuration, PCollection<byte[]>, PDone> {
+
+  /** Parameters class to expose the transform to an external SDK. */
+  public static class Configuration extends CrossLanguageConfiguration {
+    private SnowflakeTableSchema tableSchema;
+    private CreateDisposition createDisposition;
+    private WriteDisposition writeDisposition;
+
+    public void setTableSchema(String tableSchema) {
+      ObjectMapper mapper = new ObjectMapper();
+
+      try {
+        this.tableSchema = mapper.readValue(tableSchema, SnowflakeTableSchema.class);
+      } catch (IOException e) {
+        throw new RuntimeException("Format of provided table schema is invalid");
+      }
+    }
+
+    public void setCreateDisposition(String createDisposition) {
+      this.createDisposition = CreateDisposition.valueOf(createDisposition);
+    }
+
+    public void setWriteDisposition(String writeDisposition) {
+      this.writeDisposition = WriteDisposition.valueOf(writeDisposition);
+    }
+
+    public SnowflakeTableSchema getTableSchema() {
+      return tableSchema;
+    }
+
+    public CreateDisposition getCreateDisposition() {
+      return createDisposition;
+    }
+
+    public WriteDisposition getWriteDisposition() {
+      return writeDisposition;
+    }
+  }
+
+  @Override
+  public PTransform<PCollection<byte[]>, PDone> buildExternal(Configuration c) {
+    SnowflakeCredentials credentials = SnowflakeCredentialsFactory.of(c);
+
+    SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
+        SnowflakeIO.DataSourceConfiguration.create(credentials)
+            .withServerName(c.getServerName())
+            .withDatabase(c.getDatabase())
+            .withSchema(c.getSchema())
+            .withRole(c.getRole())
+            .withWarehouse(c.getWarehouse());
+
+    return SnowflakeIO.<byte[]>write()
+        .withDataSourceConfiguration(dataSourceConfiguration)
+        .withStorageIntegrationName(c.getStorageIntegrationName())
+        .withStagingBucketName(c.getStagingBucketName())
+        .withTableSchema(c.getTableSchema())
+        .withCreateDisposition(c.getCreateDisposition())
+        .withWriteDisposition(c.getWriteDisposition())
+        .withUserDataMapper(getStringCsvMapper())
+        .withQueryTransformation(c.getQuery())
+        .to(c.getTable());
+  }
+
+  private static SnowflakeIO.UserDataMapper<List<byte[]>> getStringCsvMapper() {
+    return (SnowflakeIO.UserDataMapper<List<byte[]>>)
+        recordLine -> recordLine.stream().map(String::new).toArray();
+  }
+}
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeDataType.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeDataType.java
index 4b8caf5..babe7dc 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeDataType.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeDataType.java
@@ -18,8 +18,62 @@
 package org.apache.beam.sdk.io.snowflake.data;
 
 import java.io.Serializable;
+import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonSubTypes;
+import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeDate;
+import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeDateTime;
+import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeTime;
+import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeTimestamp;
+import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeTimestampLTZ;
+import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeTimestampNTZ;
+import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeTimestampTZ;
+import org.apache.beam.sdk.io.snowflake.data.logical.SnowflakeBoolean;
+import org.apache.beam.sdk.io.snowflake.data.numeric.SnowflakeDecimal;
+import org.apache.beam.sdk.io.snowflake.data.numeric.SnowflakeDouble;
+import org.apache.beam.sdk.io.snowflake.data.numeric.SnowflakeFloat;
+import org.apache.beam.sdk.io.snowflake.data.numeric.SnowflakeInteger;
+import org.apache.beam.sdk.io.snowflake.data.numeric.SnowflakeNumber;
+import org.apache.beam.sdk.io.snowflake.data.numeric.SnowflakeNumeric;
+import org.apache.beam.sdk.io.snowflake.data.numeric.SnowflakeReal;
+import org.apache.beam.sdk.io.snowflake.data.structured.SnowflakeArray;
+import org.apache.beam.sdk.io.snowflake.data.structured.SnowflakeObject;
+import org.apache.beam.sdk.io.snowflake.data.structured.SnowflakeVariant;
+import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeBinary;
+import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeChar;
+import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeString;
+import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeText;
+import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeVarBinary;
+import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeVarchar;
 
 /** Interface for data types to provide SQLs for themselves. */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
+@JsonSubTypes({
+  @Type(value = SnowflakeDate.class, name = "date"),
+  @Type(value = SnowflakeDateTime.class, name = "datetime"),
+  @Type(value = SnowflakeTime.class, name = "time"),
+  @Type(value = SnowflakeTimestamp.class, name = "timestamp"),
+  @Type(value = SnowflakeTimestampLTZ.class, name = "timestamp_ltz"),
+  @Type(value = SnowflakeTimestampNTZ.class, name = "timestamp_ntz"),
+  @Type(value = SnowflakeTimestampTZ.class, name = "timestamp_tz"),
+  @Type(value = SnowflakeBoolean.class, name = "boolean"),
+  @Type(value = SnowflakeDecimal.class, name = "decimal"),
+  @Type(value = SnowflakeDouble.class, name = "double"),
+  @Type(value = SnowflakeFloat.class, name = "float"),
+  @Type(value = SnowflakeInteger.class, name = "integer"),
+  @Type(value = SnowflakeNumber.class, name = "number"),
+  @Type(value = SnowflakeNumeric.class, name = "numeric"),
+  @Type(value = SnowflakeReal.class, name = "real"),
+  @Type(value = SnowflakeArray.class, name = "array"),
+  @Type(value = SnowflakeObject.class, name = "object"),
+  @Type(value = SnowflakeVariant.class, name = "variant"),
+  @Type(value = SnowflakeBinary.class, name = "binary"),
+  @Type(value = SnowflakeChar.class, name = "char"),
+  @Type(value = SnowflakeString.class, name = "string"),
+  @Type(value = SnowflakeText.class, name = "text"),
+  @Type(value = SnowflakeVarBinary.class, name = "varbinary"),
+  @Type(value = SnowflakeVarchar.class, name = "varchar"),
+})
 public interface SnowflakeDataType extends Serializable {
   String sql();
 }
diff --git a/sdks/python/apache_beam/io/external/snowflake.py b/sdks/python/apache_beam/io/external/snowflake.py
deleted file mode 100644
index e7ffa6a..0000000
--- a/sdks/python/apache_beam/io/external/snowflake.py
+++ /dev/null
@@ -1,204 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Snowflake transforms tested against Flink portable runner.
-
-  **Setup**
-
-  Transforms provided in this module are cross-language transforms
-  implemented in the Beam Java SDK. During the pipeline construction, Python SDK
-  will connect to a Java expansion service to expand these transforms.
-  To facilitate this, a small amount of setup is needed before using these
-  transforms in a Beam Python pipeline.
-
-  There are several ways to setup cross-language Snowflake transforms.
-
-  * Option 1: use the default expansion service
-  * Option 2: specify a custom expansion service
-
-  See below for details regarding each of these options.
-
-  *Option 1: Use the default expansion service*
-
-  This is the recommended and easiest setup option for using Python Snowflake
-  transforms.This option requires following pre-requisites
-  before running the Beam pipeline.
-
-  * Install Java runtime in the computer from where the pipeline is constructed
-    and make sure that 'java' command is available.
-
-  In this option, Python SDK will either download (for released Beam version) or
-  build (when running from a Beam Git clone) a expansion service jar and use
-  that to expand transforms. Currently Snowflake transforms use the
-  'beam-sdks-java-io-expansion-service' jar for this purpose.
-
-  *Option 2: specify a custom expansion service*
-
-  In this option, you startup your own expansion service and provide that as
-  a parameter when using the transforms provided in this module.
-
-  This option requires following pre-requisites before running the Beam
-  pipeline.
-
-  * Startup your own expansion service.
-  * Update your pipeline to provide the expansion service address when
-    initiating Snowflake transforms provided in this module.
-
-  Flink Users can use the built-in Expansion Service of the Flink Runner's
-  Job Server. If you start Flink's Job Server, the expansion service will be
-  started on port 8097. For a different address, please set the
-  expansion_service parameter.
-
-  **More information**
-
-  For more information regarding cross-language transforms see:
-  - https://beam.apache.org/roadmap/portability/
-
-  For more information specific to Flink runner see:
-  - https://beam.apache.org/documentation/runners/flink/
-"""
-
-# pytype: skip-file
-
-from __future__ import absolute_import
-
-import typing
-
-from past.builtins import unicode
-
-import apache_beam as beam
-from apache_beam.transforms.external import BeamJarExpansionService
-from apache_beam.transforms.external import ExternalTransform
-from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
-
-ReadFromSnowflakeSchema = typing.NamedTuple(
-    'ReadFromSnowflakeSchema',
-    [
-        ('server_name', unicode),
-        ('schema', unicode),
-        ('database', unicode),
-        ('staging_bucket_name', unicode),
-        ('storage_integration_name', unicode),
-        ('username', typing.Optional[unicode]),
-        ('password', typing.Optional[unicode]),
-        ('private_key_path', typing.Optional[unicode]),
-        ('private_key_passphrase', typing.Optional[unicode]),
-        ('o_auth_token', typing.Optional[unicode]),
-        ('table', typing.Optional[unicode]),
-        ('query', typing.Optional[unicode]),
-    ])
-
-
-def default_io_expansion_service():
-  return BeamJarExpansionService(
-      'sdks:java:io:snowflake:expansion-service:shadowJar')
-
-
-class ReadFromSnowflake(beam.PTransform):
-  """
-    An external PTransform which reads from Snowflake.
-  """
-
-  URN = 'beam:external:java:snowflake:read:v1'
-
-  def __init__(
-      self,
-      server_name,
-      schema,
-      database,
-      staging_bucket_name,
-      storage_integration_name,
-      csv_mapper,
-      username=None,
-      password=None,
-      private_key_path=None,
-      private_key_passphrase=None,
-      o_auth_token=None,
-      table=None,
-      query=None,
-      expansion_service=None):
-    """
-    Initializes a read operation from Snowflake.
-
-    Required parameters:
-
-    :param server_name: full Snowflake server name with the following format
-         account.region.gcp.snowflakecomputing.com.
-    :param schema: name of the Snowflake schema in the database to use.
-    :param database: name of the Snowflake database to use.
-    :param staging_bucket_name: name of the Google Cloud Storage bucket.::
-        Bucket will be used as a temporary location for storing CSV files.
-        Those temporary directories will be named
-        'sf_copy_csv_DATE_TIME_RANDOMSUFFIX'
-        and they will be removed automatically once Read operation finishes.
-    :param storage_integration_name: is the name of storage integration
-        object created according to Snowflake documentation.
-    :param csv_mapper: specifies a function which must translate
-        user-defined object to array of strings.
-        SnowflakeIO uses a COPY INTO <location> statement to move data from
-        a Snowflake table to Google Cloud Storage as CSV files.These files
-        are then downloaded via FileIO and processed line by line.
-        Each line is split into an array of Strings using the OpenCSV
-        The csv_mapper function job is to give the user the possibility to
-        convert the array of Strings to a user-defined type,
-        ie. GenericRecord for Avro or Parquet files, or custom objects.
-        Example:
-        def csv_mapper(strings_array)
-        return User(strings_array[0], int(strings_array[1])))
-    :param table: specifies a Snowflake table name.
-    :param query: specifies a Snowflake custom SQL query.
-    :param expansion_service: specifies URL of expansion service.
-
-    Authentication parameters:
-
-    :param username: specifies username for
-        username/password authentication method.
-    :param password: specifies password for
-        username/password authentication method.
-    :param private_key_path: specifies a private key file for
-        key/ pair authentication method.
-    :param private_key_passphrase: specifies password for
-        key/ pair authentication method.
-    :param o_auth_token: specifies access token for
-        OAuth authentication method.
-    """
-    self.params = ReadFromSnowflakeSchema(
-        server_name=server_name,
-        schema=schema,
-        database=database,
-        staging_bucket_name=staging_bucket_name,
-        storage_integration_name=storage_integration_name,
-        username=username,
-        password=password,
-        private_key_path=private_key_path,
-        private_key_passphrase=private_key_passphrase,
-        o_auth_token=o_auth_token,
-        table=table,
-        query=query)
-    self.csv_mapper = csv_mapper
-    self.expansion_service = expansion_service or default_io_expansion_service()
-
-  def expand(self, pbegin):
-    return (
-        pbegin
-        | ExternalTransform(
-            self.URN,
-            NamedTupleBasedPayloadBuilder(self.params),
-            self.expansion_service,
-        )
-        | 'CSV to array mapper' >> beam.Map(lambda csv: csv.split(b','))
-        | 'CSV mapper' >> beam.Map(self.csv_mapper))
diff --git a/sdks/python/apache_beam/io/external/xlang_snowflakeio_it_test.py b/sdks/python/apache_beam/io/external/xlang_snowflakeio_it_test.py
new file mode 100644
index 0000000..b90c021
--- /dev/null
+++ b/sdks/python/apache_beam/io/external/xlang_snowflakeio_it_test.py
@@ -0,0 +1,269 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for cross-language snowflake io operations.
+
+Example run:
+
+python setup.py nosetests --tests=apache_beam.io.external.snowflake_test \
+--test-pipeline-options="
+  --server_name=<SNOWFLAKE_SERVER_NAME>
+  --username=<SNOWFLAKE_USERNAME>
+  --password=<SNOWFLAKE_PASSWORD>
+  --private_key_path=<PATH_TO_PRIVATE_KEY>
+  --private_key_passphrase=<PASSWORD_TO_PRIVATE_KEY>
+  --o_auth_token=<TOKEN>
+  --staging_bucket_name=<GCP_BUCKET_PATH>
+  --storage_integration_name=<SNOWFLAKE_STORAGE_INTEGRATION_NAME>
+  --database=<DATABASE>
+  --schema=<SCHEMA>
+  --role=<ROLE>
+  --warehouse=<WAREHOUSE>
+  --table=<TABLE_NAME>
+  --runner=FlinkRunner"
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import binascii
+import logging
+import unittest
+from typing import ByteString
+from typing import NamedTuple
+
+import apache_beam as beam
+from apache_beam import coders
+from apache_beam.io.snowflake import CreateDisposition
+from apache_beam.io.snowflake import ReadFromSnowflake
+from apache_beam.io.snowflake import WriteDisposition
+from apache_beam.io.snowflake import WriteToSnowflake
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+  from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+except ImportError:
+  GCSFileSystem = None
+# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
+
+SCHEMA_STRING = """
+{"schema":[
+    {"dataType":{"type":"integer","precision":38,"scale":0},"name":"number_column","nullable":false},
+    {"dataType":{"type":"boolean"},"name":"boolean_column","nullable":false},
+    {"dataType":{"type":"binary","size":100},"name":"bytes_column","nullable":true}
+]}
+"""
+
+TestRow = NamedTuple(
+    'TestRow',
+    [
+        ('number_column', int),
+        ('boolean_column', bool),
+        ('bytes_column', ByteString),
+    ])
+
+coders.registry.register_coder(TestRow, coders.RowCoder)
+
+NUM_RECORDS = 100
+
+
+@unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+@unittest.skipIf(
+    TestPipeline().get_option('server_name') is None,
+    'Snowflake IT test requires external configuration to be run.')
+class SnowflakeTest(unittest.TestCase):
+  def test_snowflake_write_read(self):
+    self.run_write()
+    self.run_read()
+
+  def run_write(self):
+    def user_data_mapper(test_row):
+      return [
+          str(test_row.number_column).encode('utf-8'),
+          str(test_row.boolean_column).encode('utf-8'),
+          binascii.hexlify(test_row.bytes_column),
+      ]
+
+    with TestPipeline(options=PipelineOptions(self.pipeline_args)) as p:
+      p.not_use_test_runner_api = True
+      _ = (
+          p
+          | 'Impulse' >> beam.Impulse()
+          | 'Generate' >> beam.FlatMap(lambda x: range(NUM_RECORDS))  # pylint: disable=range-builtin-not-iterating
+          | 'Map to TestRow' >> beam.Map(
+              lambda num: TestRow(
+                  num, num % 2 == 0, b"test" + str(num).encode()))
+          | WriteToSnowflake(
+              server_name=self.server_name,
+              username=self.username,
+              password=self.password,
+              o_auth_token=self.o_auth_token,
+              private_key_path=self.private_key_path,
+              private_key_passphrase=self.private_key_passphrase,
+              schema=self.schema,
+              database=self.database,
+              role=self.role,
+              warehouse=self.warehouse,
+              staging_bucket_name=self.staging_bucket_name,
+              storage_integration_name=self.storage_integration_name,
+              create_disposition=CreateDisposition.CREATE_IF_NEEDED,
+              write_disposition=WriteDisposition.TRUNCATE,
+              table_schema=SCHEMA_STRING,
+              user_data_mapper=user_data_mapper,
+              table=self.table,
+              query=None,
+              expansion_service=self.expansion_service,
+          ))
+
+  def run_read(self):
+    def csv_mapper(bytes_array):
+      return TestRow(
+          int(bytes_array[0]),
+          bytes_array[1] == b'true',
+          binascii.unhexlify(bytes_array[2]))
+
+    with TestPipeline(options=PipelineOptions(self.pipeline_args)) as p:
+      result = (
+          p
+          | ReadFromSnowflake(
+              server_name=self.server_name,
+              username=self.username,
+              password=self.password,
+              o_auth_token=self.o_auth_token,
+              private_key_path=self.private_key_path,
+              private_key_passphrase=self.private_key_passphrase,
+              schema=self.schema,
+              database=self.database,
+              role=self.role,
+              warehouse=self.warehouse,
+              staging_bucket_name=self.staging_bucket_name,
+              storage_integration_name=self.storage_integration_name,
+              csv_mapper=csv_mapper,
+              table=self.table,
+              query=None,
+              expansion_service=self.expansion_service,
+          ).with_output_types(TestRow))
+
+      assert_that(
+          result,
+          equal_to([
+              TestRow(i, i % 2 == 0, b'test' + str(i).encode())
+              for i in range(NUM_RECORDS)
+          ]))
+
+  def tearDown(self):
+    GCSFileSystem(pipeline_options=PipelineOptions()) \
+        .delete([self.staging_bucket_name])
+
+  def setUp(self):
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+        '--server_name',
+        required=True,
+        help=(
+            'Snowflake server name of the form '
+            'https://<SNOWFLAKE_ACCOUNT_NAME>.snowflakecomputing.com'),
+    )
+    parser.add_argument(
+        '--username',
+        help='Snowflake username',
+    )
+    parser.add_argument(
+        '--password',
+        help='Snowflake password',
+    )
+    parser.add_argument(
+        '--private_key_path',
+        help='Path to private key',
+    )
+    parser.add_argument(
+        '--private_key_passphrase',
+        help='Password to private key',
+    )
+    parser.add_argument(
+        '--o_auth_token',
+        help='OAuth token',
+    )
+    parser.add_argument(
+        '--staging_bucket_name',
+        required=True,
+        help='GCP staging bucket name (must end with backslash)',
+    )
+    parser.add_argument(
+        '--storage_integration_name',
+        required=True,
+        help='Snowflake integration name',
+    )
+    parser.add_argument(
+        '--database',
+        required=True,
+        help='Snowflake database name',
+    )
+    parser.add_argument(
+        '--schema',
+        required=True,
+        help='Snowflake schema name',
+    )
+    parser.add_argument(
+        '--table',
+        required=True,
+        help='Snowflake table name',
+    )
+    parser.add_argument(
+        '--role',
+        help='Snowflake role',
+    )
+    parser.add_argument(
+        '--warehouse',
+        help='Snowflake warehouse name',
+    )
+    parser.add_argument(
+        '--expansion_service',
+        help='Url to externally launched expansion service.',
+    )
+
+    pipeline = TestPipeline()
+    argv = pipeline.get_full_options_as_args()
+
+    known_args, self.pipeline_args = parser.parse_known_args(argv)
+
+    self.server_name = known_args.server_name
+    self.database = known_args.database
+    self.schema = known_args.schema
+    self.table = known_args.table
+    self.username = known_args.username
+    self.password = known_args.password
+    self.private_key_path = known_args.private_key_path
+    self.private_key_passphrase = known_args.private_key_passphrase
+    self.o_auth_token = known_args.o_auth_token
+    self.staging_bucket_name = known_args.staging_bucket_name
+    self.storage_integration_name = known_args.storage_integration_name
+    self.role = known_args.role
+    self.warehouse = known_args.warehouse
+    self.expansion_service = known_args.expansion_service
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/io/snowflake.py b/sdks/python/apache_beam/io/snowflake.py
new file mode 100644
index 0000000..71321ca
--- /dev/null
+++ b/sdks/python/apache_beam/io/snowflake.py
@@ -0,0 +1,474 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Snowflake transforms tested against Flink portable runner.
+
+  **Setup**
+
+  Transforms provided in this module are cross-language transforms
+  implemented in the Beam Java SDK. During the pipeline construction, Python SDK
+  will connect to a Java expansion service to expand these transforms.
+  To facilitate this, a small amount of setup is needed before using these
+  transforms in a Beam Python pipeline.
+
+  There are several ways to setup cross-language Snowflake transforms.
+
+  * Option 1: use the default expansion service
+  * Option 2: specify a custom expansion service
+
+  See below for details regarding each of these options.
+
+  *Option 1: Use the default expansion service*
+
+  This is the recommended and easiest setup option for using Python Snowflake
+  transforms.This option requires following pre-requisites
+  before running the Beam pipeline.
+
+  * Install Java runtime in the computer from where the pipeline is constructed
+    and make sure that 'java' command is available.
+
+  In this option, Python SDK will either download (for released Beam version) or
+  build (when running from a Beam Git clone) a expansion service jar and use
+  that to expand transforms. Currently Snowflake transforms use the
+  'beam-sdks-java-io-snowflake-expansion-service' jar for this purpose.
+
+  *Option 2: specify a custom expansion service*
+
+  In this option, you startup your own expansion service and provide that as
+  a parameter when using the transforms provided in this module.
+
+  This option requires following pre-requisites before running the Beam
+  pipeline.
+
+  * Startup your own expansion service.
+  * Update your pipeline to provide the expansion service address when
+    initiating Snowflake transforms provided in this module.
+
+  Flink Users can use the built-in Expansion Service of the Flink Runner's
+  Job Server. If you start Flink's Job Server, the expansion service will be
+  started on port 8097. For a different address, please set the
+  expansion_service parameter.
+
+  **More information**
+
+  For more information regarding cross-language transforms see:
+  - https://beam.apache.org/roadmap/portability/
+
+  For more information specific to Flink runner see:
+  - https://beam.apache.org/documentation/runners/flink/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from past.builtins import unicode
+
+import apache_beam as beam
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+
+__all__ = [
+    'ReadFromSnowflake',
+    'WriteToSnowflake',
+    'WriteDisposition',
+    'CreateDisposition',
+]
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService(
+      'sdks:java:io:snowflake:expansion-service:shadowJar')
+
+
+ReadFromSnowflakeSchema = NamedTuple(
+    'ReadFromSnowflakeSchema',
+    [
+        ('server_name', unicode),
+        ('schema', unicode),
+        ('database', unicode),
+        ('staging_bucket_name', unicode),
+        ('storage_integration_name', unicode),
+        ('username', Optional[unicode]),
+        ('password', Optional[unicode]),
+        ('private_key_path', Optional[unicode]),
+        ('private_key_passphrase', Optional[unicode]),
+        ('o_auth_token', Optional[unicode]),
+        ('table', Optional[unicode]),
+        ('query', Optional[unicode]),
+        ('role', Optional[unicode]),
+        ('warehouse', Optional[unicode]),
+    ])
+
+
+class ReadFromSnowflake(beam.PTransform):
+  """
+    An external PTransform which reads from Snowflake.
+  """
+
+  URN = 'beam:external:java:snowflake:read:v1'
+
+  def __init__(
+      self,
+      server_name,
+      schema,
+      database,
+      staging_bucket_name,
+      storage_integration_name,
+      csv_mapper,
+      username=None,
+      password=None,
+      private_key_path=None,
+      private_key_passphrase=None,
+      o_auth_token=None,
+      table=None,
+      query=None,
+      role=None,
+      warehouse=None,
+      expansion_service=None):
+    """
+    Initializes a read operation from Snowflake.
+
+    Required parameters:
+
+    :param server_name: full Snowflake server name with the following format
+         https://account.region.gcp.snowflakecomputing.com.
+    :param schema: name of the Snowflake schema in the database to use.
+    :param database: name of the Snowflake database to use.
+    :param staging_bucket_name: name of the Google Cloud Storage bucket.
+        Bucket will be used as a temporary location for storing CSV files.
+        Those temporary directories will be named
+        'sf_copy_csv_DATE_TIME_RANDOMSUFFIX'
+        and they will be removed automatically once Read operation finishes.
+    :param storage_integration_name: is the name of storage integration
+        object created according to Snowflake documentation.
+    :param csv_mapper: specifies a function which must translate
+        user-defined object to array of strings.
+        SnowflakeIO uses a COPY INTO <location> statement to move data from
+        a Snowflake table to Google Cloud Storage as CSV files.These files
+        are then downloaded via FileIO and processed line by line.
+        Each line is split into an array of Strings using the OpenCSV
+        The csv_mapper function job is to give the user the possibility to
+        convert the array of Strings to a user-defined type,
+        ie. GenericRecord for Avro or Parquet files, or custom objects.
+        Example:
+        def csv_mapper(strings_array)
+        return User(strings_array[0], int(strings_array[1])))
+    :param table: specifies a Snowflake table name.
+    :param query: specifies a Snowflake custom SQL query.
+    :param role: specifies a Snowflake role.
+    :param warehouse: specifies a Snowflake warehouse name.
+    :param expansion_service: specifies URL of expansion service.
+
+    Authentication parameters:
+
+    :param username: specifies username for
+        username/password authentication method.
+    :param password: specifies password for
+        username/password authentication method.
+    :param private_key_path: specifies a private key file for
+        key/ pair authentication method.
+    :param private_key_passphrase: specifies password for
+        key/ pair authentication method.
+    :param o_auth_token: specifies access token for
+        OAuth authentication method.
+    """
+    verify_credentials(
+        username,
+        password,
+        private_key_path,
+        private_key_passphrase,
+        o_auth_token,
+    )
+
+    self.params = ReadFromSnowflakeSchema(
+        server_name=server_name,
+        schema=schema,
+        database=database,
+        staging_bucket_name=staging_bucket_name,
+        storage_integration_name=storage_integration_name,
+        username=username,
+        password=password,
+        private_key_path=private_key_path,
+        private_key_passphrase=private_key_passphrase,
+        o_auth_token=o_auth_token,
+        table=table,
+        query=query,
+        role=role,
+        warehouse=warehouse,
+    )
+    self.csv_mapper = csv_mapper
+    self.expansion_service = expansion_service or default_io_expansion_service()
+
+  def expand(self, pbegin):
+    return (
+        pbegin
+        | ExternalTransform(
+            self.URN,
+            NamedTupleBasedPayloadBuilder(self.params),
+            self.expansion_service,
+        )
+        | 'CSV to array mapper' >> beam.Map(lambda csv: csv.split(b','))
+        | 'CSV mapper' >> beam.Map(self.csv_mapper))
+
+
+WriteToSnowflakeSchema = NamedTuple(
+    'WriteToSnowflakeSchema',
+    [
+        ('server_name', unicode),
+        ('schema', unicode),
+        ('database', unicode),
+        ('staging_bucket_name', unicode),
+        ('storage_integration_name', unicode),
+        ('create_disposition', unicode),
+        ('write_disposition', unicode),
+        ('table_schema', unicode),
+        ('username', Optional[unicode]),
+        ('password', Optional[unicode]),
+        ('private_key_path', Optional[unicode]),
+        ('private_key_passphrase', Optional[unicode]),
+        ('o_auth_token', Optional[unicode]),
+        ('table', Optional[unicode]),
+        ('query', Optional[unicode]),
+        ('role', Optional[unicode]),
+        ('warehouse', Optional[unicode]),
+    ],
+)
+
+
+class WriteToSnowflake(beam.PTransform):
+  """
+    An external PTransform which writes to Snowflake.
+  """
+
+  URN = 'beam:external:java:snowflake:write:v1'
+
+  def __init__(
+      self,
+      server_name,
+      schema,
+      database,
+      staging_bucket_name,
+      storage_integration_name,
+      create_disposition,
+      write_disposition,
+      table_schema,
+      user_data_mapper,
+      username=None,
+      password=None,
+      private_key_path=None,
+      private_key_passphrase=None,
+      o_auth_token=None,
+      table=None,
+      query=None,
+      role=None,
+      warehouse=None,
+      expansion_service=None,
+  ):
+    # pylint: disable=line-too-long
+
+    """
+    Initializes a write operation to Snowflake.
+
+    Required parameters:
+
+    :param server_name: full Snowflake server name with the following format
+        https://account.region.gcp.snowflakecomputing.com.
+    :param schema: name of the Snowflake schema in the database to use.
+    :param database: name of the Snowflake database to use.
+    :param staging_bucket_name: name of the Google Cloud Storage bucket.
+        Bucket will be used as a temporary location for storing CSV files.
+    :param storage_integration_name: is the name of a Snowflake storage
+        integration object created according to Snowflake documentation for the
+        GCS bucket.
+    :param user_data_mapper: specifies a function which  maps data from
+        a PCollection to an array of String values before the write operation
+        saves the data to temporary .csv files.
+        Example:
+        def user_data_mapper(user):
+        return [user.name, str(user.age)]
+    :param table: specifies a Snowflake table name
+    :param query: specifies a custom SQL query
+    :param role: specifies a Snowflake role.
+    :param warehouse: specifies a Snowflake warehouse name.
+    :param expansion_service: specifies URL of expansion service.
+
+    Authentication parameters:
+
+    :param username: specifies username for
+        username/password authentication method.
+    :param password: specifies password for
+        username/password authentication method.
+    :param private_key_path: specifies a private key file for
+        key/ pair authentication method.
+    :param private_key_passphrase: specifies password for
+        key/ pair authentication method.
+    :param o_auth_token: specifies access token for
+        OAuth authentication method.
+
+    Additional parameters:
+
+    :param create_disposition: Defines the behaviour of the write operation if
+        the target table does not exist. The following values are supported:
+        CREATE_IF_NEEDED - default behaviour. The write operation checks whether
+        the specified target table exists; if it does not, the write operation
+        attempts to create the table Specify the schema for the target table
+        using the table_schema parameter.
+        CREATE_NEVER -  The write operation fails if the target table does not
+        exist.
+    :param write_disposition: Defines the write behaviour based on the table
+        where data will be written to. The following values are supported:
+        APPEND - Default behaviour. Written data is added to the existing rows
+        in the table,
+        EMPTY - The target table must be empty;  otherwise, the write operation
+        fails,
+        TRUNCATE - The write operation deletes all rows from the target table
+        before writing to it.
+    :param table_schema: When the create_disposition parameter is set to
+        CREATE_IF_NEEDED, the table_schema  parameter  enables specifying the
+        schema for the created target table. A table schema is as JSON with the
+        following structure:
+        {"schema":[
+        {
+        "dataType":{"type":"<COLUMN DATA TYPE>"},
+        "name":"<COLUMN  NAME> ",
+        "nullable": <NULLABLE>
+        },
+        ]}
+
+        All supported data types:
+        {"schema":[
+        {"dataType":{"type":"date"},"name":"","nullable":false},
+        {"dataType":{"type":"datetime"},"name":"","nullable":false},
+        {"dataType":{"type":"time"},"name":"","nullable":false},
+        {"dataType":{"type":"timestamp"},"name":"","nullable":false},
+        {"dataType":{"type":"timestamp_ltz"},"name":"","nullable":false},
+        {"dataType":{"type":"timestamp_ntz"},"name":"","nullable":false},
+        {"dataType":{"type":"timestamp_tz"},"name":"","nullable":false},
+        {"dataType":{"type":"boolean"},"name":"","nullable":false},
+        {"dataType":{"type":"decimal","precision":38,"scale":1},"name":"","nullable":true},
+        {"dataType":{"type":"double"},"name":"","nullable":false},
+        {"dataType":{"type":"float"},"name":"","nullable":false},
+        {"dataType":{"type":"integer","precision":38,"scale":0},"name":"","nullable":false},
+        {"dataType":{"type":"number","precision":38,"scale":1},"name":"","nullable":false},
+        {"dataType":{"type":"numeric","precision":38,"scale":2},"name":"","nullable":false},
+        {"dataType":{"type":"real"},"name":"","nullable":false},
+        {"dataType":{"type":"array"},"name":"","nullable":false},
+        {"dataType":{"type":"object"},"name":"","nullable":false},
+        {"dataType":{"type":"variant"},"name":"","nullable":true},
+        {"dataType":{"type":"binary","size":null},"name":"","nullable":false},
+        {"dataType":{"type":"char","length":1},"name":"","nullable":false},
+        {"dataType":{"type":"string","length":null},"name":"","nullable":false},
+        {"dataType":{"type":"text","length":null},"name":"","nullable":false},
+        {"dataType":{"type":"varbinary","size":null},"name":"","nullable":false},
+        {"dataType":{"type":"varchar","length":100},"name":"","nullable":false}]
+        }
+    """
+    verify_credentials(
+        username,
+        password,
+        private_key_path,
+        private_key_passphrase,
+        o_auth_token,
+    )
+    WriteDisposition.VerifyParam(write_disposition)
+    CreateDisposition.VerifyParam(create_disposition)
+
+    self.params = WriteToSnowflakeSchema(
+        server_name=server_name,
+        schema=schema,
+        database=database,
+        staging_bucket_name=staging_bucket_name,
+        storage_integration_name=storage_integration_name,
+        create_disposition=create_disposition,
+        write_disposition=write_disposition,
+        table_schema=table_schema,
+        username=username,
+        password=password,
+        private_key_path=private_key_path,
+        private_key_passphrase=private_key_passphrase,
+        o_auth_token=o_auth_token,
+        table=table,
+        query=query,
+        role=role,
+        warehouse=warehouse,
+    )
+    self.user_data_mapper = user_data_mapper
+    self.expansion_service = expansion_service or default_io_expansion_service()
+
+  def expand(self, pbegin):
+    return (
+        pbegin
+        | 'User data mapper' >> beam.Map(
+            self.user_data_mapper).with_output_types(List[bytes])
+        | ExternalTransform(
+            self.URN,
+            NamedTupleBasedPayloadBuilder(self.params),
+            self.expansion_service))
+
+
+class CreateDisposition:
+  """
+  Enum class for possible values of create dispositions:
+  CREATE_IF_NEEDED: default behaviour. The write operation checks whether
+  the specified target table exists; if it does not, the write operation
+  attempts to create the table Specify the schema for the target table
+  using the table_schema parameter.
+  CREATE_NEVER: The write operation fails if the target table does not exist.
+  """
+  CREATE_IF_NEEDED = 'CREATE_IF_NEEDED'
+  CREATE_NEVER = 'CREATE_NEVER'
+
+  @staticmethod
+  def VerifyParam(field):
+    if field and not hasattr(CreateDisposition, field):
+      raise RuntimeError(
+          'Create disposition has to be one of the following values:'
+          'CREATE_IF_NEEDED, CREATE_NEVER. Got: {}'.format(field))
+
+
+class WriteDisposition:
+  """
+  Enum class for possible values of write dispositions:
+  APPEND: Default behaviour. Written data is added to the existing rows
+  in the table,
+  EMPTY: The target table must be empty;  otherwise, the write operation fails,
+  TRUNCATE: The write operation deletes all rows from the target table
+  before writing to it.
+  """
+  APPEND = 'APPEND'
+  EMPTY = 'EMPTY'
+  TRUNCATE = 'TRUNCATE'
+
+  @staticmethod
+  def VerifyParam(field):
+    if field and not hasattr(WriteDisposition, field):
+      raise RuntimeError(
+          'Write disposition has to be one of the following values:'
+          'APPEND, EMPTY, TRUNCATE. Got: {}'.format(field))
+
+
+def verify_credentials(
+    username, password, private_key_path, private_key_passphrase, o_auth_token):
+  if not (o_auth_token or (username and password) or
+          (username and private_key_path and private_key_passphrase)):
+    raise RuntimeError('Snowflake credentials are not set correctly.')