You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/08/28 07:46:06 UTC
[beam] branch master updated: [BEAM-9898][BEAM-9899]
SnowflakeIO.Write for cross-language with python wrapper and integration
test (#12509)
This is an automated email from the ASF dual-hosted git repository.
mwalenia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 78d7636 [BEAM-9898][BEAM-9899] SnowflakeIO.Write for cross-language with python wrapper and integration test (#12509)
78d7636 is described below
commit 78d7636cac9234dbcc7145bb0330275c557c0010
Author: Piotr Szuberski <pi...@polidea.com>
AuthorDate: Fri Aug 28 09:45:24 2020 +0200
[BEAM-9898][BEAM-9899] SnowflakeIO.Write for cross-language with python wrapper and integration test (#12509)
Add:
* External transform registrar for SnowflakeIO.Write for cross-language usage
* Python wrapper for 1)
* Integration test for SnowflakeIO python wrappers.
---
.../io/snowflake/expansion-service/build.gradle | 3 +-
.../credentials/SnowflakeCredentialsFactory.java | 4 +-
...ration.java => CrossLanguageConfiguration.java} | 20 +-
...nowflakeReadRegistrar.java => ReadBuilder.java} | 62 ++-
.../crosslanguage/SnowflakeTransformRegistrar.java | 42 ++
.../io/snowflake/crosslanguage/WriteBuilder.java | 105 +++++
.../sdk/io/snowflake/data/SnowflakeDataType.java | 54 +++
sdks/python/apache_beam/io/external/snowflake.py | 204 ---------
.../io/external/xlang_snowflakeio_it_test.py | 269 ++++++++++++
sdks/python/apache_beam/io/snowflake.py | 474 +++++++++++++++++++++
10 files changed, 991 insertions(+), 246 deletions(-)
diff --git a/sdks/java/io/snowflake/expansion-service/build.gradle b/sdks/java/io/snowflake/expansion-service/build.gradle
index 8a6ea6c..c6e14e1 100644
--- a/sdks/java/io/snowflake/expansion-service/build.gradle
+++ b/sdks/java/io/snowflake/expansion-service/build.gradle
@@ -20,7 +20,8 @@ apply plugin: 'org.apache.beam.module'
apply plugin: 'application'
mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"
-applyJavaNature(enableChecker:false,
+applyJavaNature(
+ enableChecker:true,
automaticModuleName: 'org.apache.beam.sdk.io.expansion.service',
exportJavadoc: false,
validateShadowJar: false,
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java
index 2b45dc1..765f6dc 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java
@@ -18,7 +18,7 @@
package org.apache.beam.sdk.io.snowflake.credentials;
import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions;
-import org.apache.beam.sdk.io.snowflake.crosslanguage.SnowflakeReadRegistrar;
+import org.apache.beam.sdk.io.snowflake.crosslanguage.CrossLanguageConfiguration;
/**
* Factory class for creating implementations of {@link SnowflakeCredentials} from {@link
@@ -38,7 +38,7 @@ public class SnowflakeCredentialsFactory {
throw new RuntimeException("Can't get credentials from Options");
}
- public static SnowflakeCredentials of(SnowflakeReadRegistrar.ReadConfiguration c) {
+ public static SnowflakeCredentials of(CrossLanguageConfiguration c) {
if (oauthOptionsAvailable(c.getOAuthToken())) {
return new OAuthTokenSnowflakeCredentials(c.getOAuthToken());
} else if (usernamePasswordOptionsAvailable(c.getUsername(), c.getPassword())) {
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/CrossLanguageConfiguration.java
similarity index 89%
rename from sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java
rename to sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/CrossLanguageConfiguration.java
index 38162ae..920f343 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/CrossLanguageConfiguration.java
@@ -18,7 +18,7 @@
package org.apache.beam.sdk.io.snowflake.crosslanguage;
/** Parameters abstract class to expose the transforms to an external SDK. */
-public abstract class Configuration {
+public abstract class CrossLanguageConfiguration {
private String serverName;
private String username;
private String password;
@@ -27,6 +27,8 @@ public abstract class Configuration {
private String oAuthToken;
private String database;
private String schema;
+ private String role;
+ private String warehouse;
private String table;
private String query;
private String stagingBucketName;
@@ -96,6 +98,22 @@ public abstract class Configuration {
this.schema = schema;
}
+ public String getRole() {
+ return role;
+ }
+
+ public void setRole(String role) {
+ this.role = role;
+ }
+
+ public String getWarehouse() {
+ return warehouse;
+ }
+
+ public void setWarehouse(String warehouse) {
+ this.warehouse = warehouse;
+ }
+
public String getTable() {
return table;
}
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/ReadBuilder.java
similarity index 51%
rename from sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java
rename to sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/ReadBuilder.java
index 1e7be0f..c811cf2 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/ReadBuilder.java
@@ -17,14 +17,12 @@
*/
package org.apache.beam.sdk.io.snowflake.crosslanguage;
-import com.google.auto.service.AutoService;
import java.io.Serializable;
import java.nio.charset.Charset;
-import java.util.Map;
import javax.sql.DataSource;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory;
@@ -33,47 +31,35 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-/** Exposes {@link SnowflakeIO.Read} as an external transform for cross-language usage. */
-@Experimental
-@AutoService(ExternalTransformRegistrar.class)
-public final class SnowflakeReadRegistrar implements ExternalTransformRegistrar {
-
- public static final String URN = "beam:external:java:snowflake:read:v1";
-
- @Override
- public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
- return ImmutableMap.of(URN, ReadBuilder.class);
- }
+@Experimental(Kind.PORTABILITY)
+public class ReadBuilder
+ implements ExternalTransformBuilder<ReadBuilder.Configuration, PBegin, PCollection<byte[]>> {
/** Parameters class to expose the transform to an external SDK. */
- public static class ReadConfiguration extends Configuration {}
+ public static class Configuration extends CrossLanguageConfiguration {}
- public static class ReadBuilder
- implements ExternalTransformBuilder<ReadConfiguration, PBegin, PCollection<byte[]>> {
- public ReadBuilder() {}
-
- @Override
- public PTransform<PBegin, PCollection<byte[]>> buildExternal(ReadConfiguration c) {
- SnowflakeCredentials credentials = SnowflakeCredentialsFactory.of(c);
+ @Override
+ public PTransform<PBegin, PCollection<byte[]>> buildExternal(Configuration c) {
+ SnowflakeCredentials credentials = SnowflakeCredentialsFactory.of(c);
- SerializableFunction<Void, DataSource> dataSourceSerializableFunction =
- SnowflakeIO.DataSourceProviderFromDataSourceConfiguration.of(
- SnowflakeIO.DataSourceConfiguration.create(credentials)
- .withServerName(c.getServerName())
- .withDatabase(c.getDatabase())
- .withSchema(c.getSchema()));
+ SerializableFunction<Void, DataSource> dataSourceSerializableFunction =
+ SnowflakeIO.DataSourceProviderFromDataSourceConfiguration.of(
+ SnowflakeIO.DataSourceConfiguration.create(credentials)
+ .withServerName(c.getServerName())
+ .withDatabase(c.getDatabase())
+ .withSchema(c.getSchema())
+ .withRole(c.getRole())
+ .withWarehouse(c.getWarehouse()));
- return SnowflakeIO.<byte[]>read()
- .withStorageIntegrationName(c.getStorageIntegrationName())
- .withStagingBucketName(c.getStagingBucketName())
- .withDataSourceProviderFn(dataSourceSerializableFunction)
- .withCsvMapper(CsvMapper.getCsvMapper())
- .withCoder(ByteArrayCoder.of())
- .fromTable(c.getTable())
- .fromQuery(c.getQuery());
- }
+ return SnowflakeIO.<byte[]>read()
+ .withStorageIntegrationName(c.getStorageIntegrationName())
+ .withStagingBucketName(c.getStagingBucketName())
+ .withDataSourceProviderFn(dataSourceSerializableFunction)
+ .withCsvMapper(CsvMapper.getCsvMapper())
+ .withCoder(ByteArrayCoder.of())
+ .fromTable(c.getTable())
+ .fromQuery(c.getQuery());
}
private static class CsvMapper implements Serializable {
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java
new file mode 100644
index 0000000..e69219a
--- /dev/null
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.crosslanguage;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Exposes {@link SnowflakeIO.Read} and {@link SnowflakeIO.Write} as an external transform for
+ * cross-language usage.
+ */
+@Experimental(Kind.PORTABILITY)
+@AutoService(ExternalTransformRegistrar.class)
+public final class SnowflakeTransformRegistrar implements ExternalTransformRegistrar {
+ public static final String READ_URN = "beam:external:java:snowflake:read:v1";
+ public static final String WRITE_URN = "beam:external:java:snowflake:write:v1";
+
+ @Override
+ public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
+ return ImmutableMap.of(READ_URN, new ReadBuilder(), WRITE_URN, new WriteBuilder());
+ }
+}
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/WriteBuilder.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/WriteBuilder.java
new file mode 100644
index 0000000..9ca4c73
--- /dev/null
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/WriteBuilder.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.crosslanguage;
+
+import java.io.IOException;
+import java.util.List;
+import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
+import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory;
+import org.apache.beam.sdk.io.snowflake.data.SnowflakeTableSchema;
+import org.apache.beam.sdk.io.snowflake.enums.CreateDisposition;
+import org.apache.beam.sdk.io.snowflake.enums.WriteDisposition;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+@Experimental(Kind.PORTABILITY)
+public class WriteBuilder
+ implements ExternalTransformBuilder<WriteBuilder.Configuration, PCollection<byte[]>, PDone> {
+
+ /** Parameters class to expose the transform to an external SDK. */
+ public static class Configuration extends CrossLanguageConfiguration {
+ private SnowflakeTableSchema tableSchema;
+ private CreateDisposition createDisposition;
+ private WriteDisposition writeDisposition;
+
+ public void setTableSchema(String tableSchema) {
+ ObjectMapper mapper = new ObjectMapper();
+
+ try {
+ this.tableSchema = mapper.readValue(tableSchema, SnowflakeTableSchema.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Format of provided table schema is invalid");
+ }
+ }
+
+ public void setCreateDisposition(String createDisposition) {
+ this.createDisposition = CreateDisposition.valueOf(createDisposition);
+ }
+
+ public void setWriteDisposition(String writeDisposition) {
+ this.writeDisposition = WriteDisposition.valueOf(writeDisposition);
+ }
+
+ public SnowflakeTableSchema getTableSchema() {
+ return tableSchema;
+ }
+
+ public CreateDisposition getCreateDisposition() {
+ return createDisposition;
+ }
+
+ public WriteDisposition getWriteDisposition() {
+ return writeDisposition;
+ }
+ }
+
+ @Override
+ public PTransform<PCollection<byte[]>, PDone> buildExternal(Configuration c) {
+ SnowflakeCredentials credentials = SnowflakeCredentialsFactory.of(c);
+
+ SnowflakeIO.DataSourceConfiguration dataSourceConfiguration =
+ SnowflakeIO.DataSourceConfiguration.create(credentials)
+ .withServerName(c.getServerName())
+ .withDatabase(c.getDatabase())
+ .withSchema(c.getSchema())
+ .withRole(c.getRole())
+ .withWarehouse(c.getWarehouse());
+
+ return SnowflakeIO.<byte[]>write()
+ .withDataSourceConfiguration(dataSourceConfiguration)
+ .withStorageIntegrationName(c.getStorageIntegrationName())
+ .withStagingBucketName(c.getStagingBucketName())
+ .withTableSchema(c.getTableSchema())
+ .withCreateDisposition(c.getCreateDisposition())
+ .withWriteDisposition(c.getWriteDisposition())
+ .withUserDataMapper(getStringCsvMapper())
+ .withQueryTransformation(c.getQuery())
+ .to(c.getTable());
+ }
+
+ private static SnowflakeIO.UserDataMapper<List<byte[]>> getStringCsvMapper() {
+ return (SnowflakeIO.UserDataMapper<List<byte[]>>)
+ recordLine -> recordLine.stream().map(String::new).toArray();
+ }
+}
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeDataType.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeDataType.java
index 4b8caf5..babe7dc 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeDataType.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeDataType.java
@@ -18,8 +18,62 @@
package org.apache.beam.sdk.io.snowflake.data;
import java.io.Serializable;
+import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonSubTypes;
+import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeDate;
+import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeDateTime;
+import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeTime;
+import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeTimestamp;
+import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeTimestampLTZ;
+import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeTimestampNTZ;
+import org.apache.beam.sdk.io.snowflake.data.datetime.SnowflakeTimestampTZ;
+import org.apache.beam.sdk.io.snowflake.data.logical.SnowflakeBoolean;
+import org.apache.beam.sdk.io.snowflake.data.numeric.SnowflakeDecimal;
+import org.apache.beam.sdk.io.snowflake.data.numeric.SnowflakeDouble;
+import org.apache.beam.sdk.io.snowflake.data.numeric.SnowflakeFloat;
+import org.apache.beam.sdk.io.snowflake.data.numeric.SnowflakeInteger;
+import org.apache.beam.sdk.io.snowflake.data.numeric.SnowflakeNumber;
+import org.apache.beam.sdk.io.snowflake.data.numeric.SnowflakeNumeric;
+import org.apache.beam.sdk.io.snowflake.data.numeric.SnowflakeReal;
+import org.apache.beam.sdk.io.snowflake.data.structured.SnowflakeArray;
+import org.apache.beam.sdk.io.snowflake.data.structured.SnowflakeObject;
+import org.apache.beam.sdk.io.snowflake.data.structured.SnowflakeVariant;
+import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeBinary;
+import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeChar;
+import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeString;
+import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeText;
+import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeVarBinary;
+import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeVarchar;
/** Interface for data types to provide SQLs for themselves. */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
+@JsonSubTypes({
+ @Type(value = SnowflakeDate.class, name = "date"),
+ @Type(value = SnowflakeDateTime.class, name = "datetime"),
+ @Type(value = SnowflakeTime.class, name = "time"),
+ @Type(value = SnowflakeTimestamp.class, name = "timestamp"),
+ @Type(value = SnowflakeTimestampLTZ.class, name = "timestamp_ltz"),
+ @Type(value = SnowflakeTimestampNTZ.class, name = "timestamp_ntz"),
+ @Type(value = SnowflakeTimestampTZ.class, name = "timestamp_tz"),
+ @Type(value = SnowflakeBoolean.class, name = "boolean"),
+ @Type(value = SnowflakeDecimal.class, name = "decimal"),
+ @Type(value = SnowflakeDouble.class, name = "double"),
+ @Type(value = SnowflakeFloat.class, name = "float"),
+ @Type(value = SnowflakeInteger.class, name = "integer"),
+ @Type(value = SnowflakeNumber.class, name = "number"),
+ @Type(value = SnowflakeNumeric.class, name = "numeric"),
+ @Type(value = SnowflakeReal.class, name = "real"),
+ @Type(value = SnowflakeArray.class, name = "array"),
+ @Type(value = SnowflakeObject.class, name = "object"),
+ @Type(value = SnowflakeVariant.class, name = "variant"),
+ @Type(value = SnowflakeBinary.class, name = "binary"),
+ @Type(value = SnowflakeChar.class, name = "char"),
+ @Type(value = SnowflakeString.class, name = "string"),
+ @Type(value = SnowflakeText.class, name = "text"),
+ @Type(value = SnowflakeVarBinary.class, name = "varbinary"),
+ @Type(value = SnowflakeVarchar.class, name = "varchar"),
+})
public interface SnowflakeDataType extends Serializable {
String sql();
}
diff --git a/sdks/python/apache_beam/io/external/snowflake.py b/sdks/python/apache_beam/io/external/snowflake.py
deleted file mode 100644
index e7ffa6a..0000000
--- a/sdks/python/apache_beam/io/external/snowflake.py
+++ /dev/null
@@ -1,204 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Snowflake transforms tested against Flink portable runner.
-
- **Setup**
-
- Transforms provided in this module are cross-language transforms
- implemented in the Beam Java SDK. During the pipeline construction, Python SDK
- will connect to a Java expansion service to expand these transforms.
- To facilitate this, a small amount of setup is needed before using these
- transforms in a Beam Python pipeline.
-
- There are several ways to setup cross-language Snowflake transforms.
-
- * Option 1: use the default expansion service
- * Option 2: specify a custom expansion service
-
- See below for details regarding each of these options.
-
- *Option 1: Use the default expansion service*
-
- This is the recommended and easiest setup option for using Python Snowflake
- transforms.This option requires following pre-requisites
- before running the Beam pipeline.
-
- * Install Java runtime in the computer from where the pipeline is constructed
- and make sure that 'java' command is available.
-
- In this option, Python SDK will either download (for released Beam version) or
- build (when running from a Beam Git clone) a expansion service jar and use
- that to expand transforms. Currently Snowflake transforms use the
- 'beam-sdks-java-io-expansion-service' jar for this purpose.
-
- *Option 2: specify a custom expansion service*
-
- In this option, you startup your own expansion service and provide that as
- a parameter when using the transforms provided in this module.
-
- This option requires following pre-requisites before running the Beam
- pipeline.
-
- * Startup your own expansion service.
- * Update your pipeline to provide the expansion service address when
- initiating Snowflake transforms provided in this module.
-
- Flink Users can use the built-in Expansion Service of the Flink Runner's
- Job Server. If you start Flink's Job Server, the expansion service will be
- started on port 8097. For a different address, please set the
- expansion_service parameter.
-
- **More information**
-
- For more information regarding cross-language transforms see:
- - https://beam.apache.org/roadmap/portability/
-
- For more information specific to Flink runner see:
- - https://beam.apache.org/documentation/runners/flink/
-"""
-
-# pytype: skip-file
-
-from __future__ import absolute_import
-
-import typing
-
-from past.builtins import unicode
-
-import apache_beam as beam
-from apache_beam.transforms.external import BeamJarExpansionService
-from apache_beam.transforms.external import ExternalTransform
-from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
-
-ReadFromSnowflakeSchema = typing.NamedTuple(
- 'ReadFromSnowflakeSchema',
- [
- ('server_name', unicode),
- ('schema', unicode),
- ('database', unicode),
- ('staging_bucket_name', unicode),
- ('storage_integration_name', unicode),
- ('username', typing.Optional[unicode]),
- ('password', typing.Optional[unicode]),
- ('private_key_path', typing.Optional[unicode]),
- ('private_key_passphrase', typing.Optional[unicode]),
- ('o_auth_token', typing.Optional[unicode]),
- ('table', typing.Optional[unicode]),
- ('query', typing.Optional[unicode]),
- ])
-
-
-def default_io_expansion_service():
- return BeamJarExpansionService(
- 'sdks:java:io:snowflake:expansion-service:shadowJar')
-
-
-class ReadFromSnowflake(beam.PTransform):
- """
- An external PTransform which reads from Snowflake.
- """
-
- URN = 'beam:external:java:snowflake:read:v1'
-
- def __init__(
- self,
- server_name,
- schema,
- database,
- staging_bucket_name,
- storage_integration_name,
- csv_mapper,
- username=None,
- password=None,
- private_key_path=None,
- private_key_passphrase=None,
- o_auth_token=None,
- table=None,
- query=None,
- expansion_service=None):
- """
- Initializes a read operation from Snowflake.
-
- Required parameters:
-
- :param server_name: full Snowflake server name with the following format
- account.region.gcp.snowflakecomputing.com.
- :param schema: name of the Snowflake schema in the database to use.
- :param database: name of the Snowflake database to use.
- :param staging_bucket_name: name of the Google Cloud Storage bucket.::
- Bucket will be used as a temporary location for storing CSV files.
- Those temporary directories will be named
- 'sf_copy_csv_DATE_TIME_RANDOMSUFFIX'
- and they will be removed automatically once Read operation finishes.
- :param storage_integration_name: is the name of storage integration
- object created according to Snowflake documentation.
- :param csv_mapper: specifies a function which must translate
- user-defined object to array of strings.
- SnowflakeIO uses a COPY INTO <location> statement to move data from
- a Snowflake table to Google Cloud Storage as CSV files.These files
- are then downloaded via FileIO and processed line by line.
- Each line is split into an array of Strings using the OpenCSV
- The csv_mapper function job is to give the user the possibility to
- convert the array of Strings to a user-defined type,
- ie. GenericRecord for Avro or Parquet files, or custom objects.
- Example:
- def csv_mapper(strings_array)
- return User(strings_array[0], int(strings_array[1])))
- :param table: specifies a Snowflake table name.
- :param query: specifies a Snowflake custom SQL query.
- :param expansion_service: specifies URL of expansion service.
-
- Authentication parameters:
-
- :param username: specifies username for
- username/password authentication method.
- :param password: specifies password for
- username/password authentication method.
- :param private_key_path: specifies a private key file for
- key/ pair authentication method.
- :param private_key_passphrase: specifies password for
- key/ pair authentication method.
- :param o_auth_token: specifies access token for
- OAuth authentication method.
- """
- self.params = ReadFromSnowflakeSchema(
- server_name=server_name,
- schema=schema,
- database=database,
- staging_bucket_name=staging_bucket_name,
- storage_integration_name=storage_integration_name,
- username=username,
- password=password,
- private_key_path=private_key_path,
- private_key_passphrase=private_key_passphrase,
- o_auth_token=o_auth_token,
- table=table,
- query=query)
- self.csv_mapper = csv_mapper
- self.expansion_service = expansion_service or default_io_expansion_service()
-
- def expand(self, pbegin):
- return (
- pbegin
- | ExternalTransform(
- self.URN,
- NamedTupleBasedPayloadBuilder(self.params),
- self.expansion_service,
- )
- | 'CSV to array mapper' >> beam.Map(lambda csv: csv.split(b','))
- | 'CSV mapper' >> beam.Map(self.csv_mapper))
diff --git a/sdks/python/apache_beam/io/external/xlang_snowflakeio_it_test.py b/sdks/python/apache_beam/io/external/xlang_snowflakeio_it_test.py
new file mode 100644
index 0000000..b90c021
--- /dev/null
+++ b/sdks/python/apache_beam/io/external/xlang_snowflakeio_it_test.py
@@ -0,0 +1,269 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for cross-language snowflake io operations.
+
+Example run:
+
+python setup.py nosetests --tests=apache_beam.io.external.snowflake_test \
+--test-pipeline-options="
+ --server_name=<SNOWFLAKE_SERVER_NAME>
+ --username=<SNOWFLAKE_USERNAME>
+ --password=<SNOWFLAKE_PASSWORD>
+ --private_key_path=<PATH_TO_PRIVATE_KEY>
+ --private_key_passphrase=<PASSWORD_TO_PRIVATE_KEY>
+ --o_auth_token=<TOKEN>
+ --staging_bucket_name=<GCP_BUCKET_PATH>
+ --storage_integration_name=<SNOWFLAKE_STORAGE_INTEGRATION_NAME>
+ --database=<DATABASE>
+ --schema=<SCHEMA>
+ --role=<ROLE>
+ --warehouse=<WAREHOUSE>
+ --table=<TABLE_NAME>
+ --runner=FlinkRunner"
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import binascii
+import logging
+import unittest
+from typing import ByteString
+from typing import NamedTuple
+
+import apache_beam as beam
+from apache_beam import coders
+from apache_beam.io.snowflake import CreateDisposition
+from apache_beam.io.snowflake import ReadFromSnowflake
+from apache_beam.io.snowflake import WriteDisposition
+from apache_beam.io.snowflake import WriteToSnowflake
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+ from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+except ImportError:
+ GCSFileSystem = None
+# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
+
+SCHEMA_STRING = """
+{"schema":[
+ {"dataType":{"type":"integer","precision":38,"scale":0},"name":"number_column","nullable":false},
+ {"dataType":{"type":"boolean"},"name":"boolean_column","nullable":false},
+ {"dataType":{"type":"binary","size":100},"name":"bytes_column","nullable":true}
+]}
+"""
+
+TestRow = NamedTuple(
+ 'TestRow',
+ [
+ ('number_column', int),
+ ('boolean_column', bool),
+ ('bytes_column', ByteString),
+ ])
+
+coders.registry.register_coder(TestRow, coders.RowCoder)
+
+NUM_RECORDS = 100
+
+
+@unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed')
+@unittest.skipIf(
+ TestPipeline().get_option('server_name') is None,
+ 'Snowflake IT test requires external configuration to be run.')
+class SnowflakeTest(unittest.TestCase):
+ def test_snowflake_write_read(self):
+ self.run_write()
+ self.run_read()
+
+ def run_write(self):
+ def user_data_mapper(test_row):
+ return [
+ str(test_row.number_column).encode('utf-8'),
+ str(test_row.boolean_column).encode('utf-8'),
+ binascii.hexlify(test_row.bytes_column),
+ ]
+
+ with TestPipeline(options=PipelineOptions(self.pipeline_args)) as p:
+ p.not_use_test_runner_api = True
+ _ = (
+ p
+ | 'Impulse' >> beam.Impulse()
+ | 'Generate' >> beam.FlatMap(lambda x: range(NUM_RECORDS)) # pylint: disable=range-builtin-not-iterating
+ | 'Map to TestRow' >> beam.Map(
+ lambda num: TestRow(
+ num, num % 2 == 0, b"test" + str(num).encode()))
+ | WriteToSnowflake(
+ server_name=self.server_name,
+ username=self.username,
+ password=self.password,
+ o_auth_token=self.o_auth_token,
+ private_key_path=self.private_key_path,
+ private_key_passphrase=self.private_key_passphrase,
+ schema=self.schema,
+ database=self.database,
+ role=self.role,
+ warehouse=self.warehouse,
+ staging_bucket_name=self.staging_bucket_name,
+ storage_integration_name=self.storage_integration_name,
+ create_disposition=CreateDisposition.CREATE_IF_NEEDED,
+ write_disposition=WriteDisposition.TRUNCATE,
+ table_schema=SCHEMA_STRING,
+ user_data_mapper=user_data_mapper,
+ table=self.table,
+ query=None,
+ expansion_service=self.expansion_service,
+ ))
+
+ def run_read(self):
+ def csv_mapper(bytes_array):
+ return TestRow(
+ int(bytes_array[0]),
+ bytes_array[1] == b'true',
+ binascii.unhexlify(bytes_array[2]))
+
+ with TestPipeline(options=PipelineOptions(self.pipeline_args)) as p:
+ result = (
+ p
+ | ReadFromSnowflake(
+ server_name=self.server_name,
+ username=self.username,
+ password=self.password,
+ o_auth_token=self.o_auth_token,
+ private_key_path=self.private_key_path,
+ private_key_passphrase=self.private_key_passphrase,
+ schema=self.schema,
+ database=self.database,
+ role=self.role,
+ warehouse=self.warehouse,
+ staging_bucket_name=self.staging_bucket_name,
+ storage_integration_name=self.storage_integration_name,
+ csv_mapper=csv_mapper,
+ table=self.table,
+ query=None,
+ expansion_service=self.expansion_service,
+ ).with_output_types(TestRow))
+
+ assert_that(
+ result,
+ equal_to([
+ TestRow(i, i % 2 == 0, b'test' + str(i).encode())
+ for i in range(NUM_RECORDS)
+ ]))
+
+ def tearDown(self):
+ GCSFileSystem(pipeline_options=PipelineOptions()) \
+ .delete([self.staging_bucket_name])
+
+ def setUp(self):
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--server_name',
+ required=True,
+ help=(
+ 'Snowflake server name of the form '
+ 'https://<SNOWFLAKE_ACCOUNT_NAME>.snowflakecomputing.com'),
+ )
+ parser.add_argument(
+ '--username',
+ help='Snowflake username',
+ )
+ parser.add_argument(
+ '--password',
+ help='Snowflake password',
+ )
+ parser.add_argument(
+ '--private_key_path',
+ help='Path to private key',
+ )
+ parser.add_argument(
+ '--private_key_passphrase',
+ help='Password to private key',
+ )
+ parser.add_argument(
+ '--o_auth_token',
+ help='OAuth token',
+ )
+ parser.add_argument(
+ '--staging_bucket_name',
+ required=True,
+ help='GCP staging bucket name (must end with backslash)',
+ )
+ parser.add_argument(
+ '--storage_integration_name',
+ required=True,
+ help='Snowflake integration name',
+ )
+ parser.add_argument(
+ '--database',
+ required=True,
+ help='Snowflake database name',
+ )
+ parser.add_argument(
+ '--schema',
+ required=True,
+ help='Snowflake schema name',
+ )
+ parser.add_argument(
+ '--table',
+ required=True,
+ help='Snowflake table name',
+ )
+ parser.add_argument(
+ '--role',
+ help='Snowflake role',
+ )
+ parser.add_argument(
+ '--warehouse',
+ help='Snowflake warehouse name',
+ )
+ parser.add_argument(
+ '--expansion_service',
+ help='Url to externally launched expansion service.',
+ )
+
+ pipeline = TestPipeline()
+ argv = pipeline.get_full_options_as_args()
+
+ known_args, self.pipeline_args = parser.parse_known_args(argv)
+
+ self.server_name = known_args.server_name
+ self.database = known_args.database
+ self.schema = known_args.schema
+ self.table = known_args.table
+ self.username = known_args.username
+ self.password = known_args.password
+ self.private_key_path = known_args.private_key_path
+ self.private_key_passphrase = known_args.private_key_passphrase
+ self.o_auth_token = known_args.o_auth_token
+ self.staging_bucket_name = known_args.staging_bucket_name
+ self.storage_integration_name = known_args.storage_integration_name
+ self.role = known_args.role
+ self.warehouse = known_args.warehouse
+ self.expansion_service = known_args.expansion_service
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
diff --git a/sdks/python/apache_beam/io/snowflake.py b/sdks/python/apache_beam/io/snowflake.py
new file mode 100644
index 0000000..71321ca
--- /dev/null
+++ b/sdks/python/apache_beam/io/snowflake.py
@@ -0,0 +1,474 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Snowflake transforms tested against Flink portable runner.
+
+ **Setup**
+
+ Transforms provided in this module are cross-language transforms
+ implemented in the Beam Java SDK. During the pipeline construction, Python SDK
+ will connect to a Java expansion service to expand these transforms.
+ To facilitate this, a small amount of setup is needed before using these
+ transforms in a Beam Python pipeline.
+
+ There are several ways to setup cross-language Snowflake transforms.
+
+ * Option 1: use the default expansion service
+ * Option 2: specify a custom expansion service
+
+ See below for details regarding each of these options.
+
+ *Option 1: Use the default expansion service*
+
+ This is the recommended and easiest setup option for using Python Snowflake
+ transforms.This option requires following pre-requisites
+ before running the Beam pipeline.
+
+ * Install Java runtime in the computer from where the pipeline is constructed
+ and make sure that 'java' command is available.
+
+ In this option, Python SDK will either download (for released Beam version) or
+ build (when running from a Beam Git clone) a expansion service jar and use
+ that to expand transforms. Currently Snowflake transforms use the
+ 'beam-sdks-java-io-snowflake-expansion-service' jar for this purpose.
+
+ *Option 2: specify a custom expansion service*
+
+ In this option, you startup your own expansion service and provide that as
+ a parameter when using the transforms provided in this module.
+
+ This option requires following pre-requisites before running the Beam
+ pipeline.
+
+ * Startup your own expansion service.
+ * Update your pipeline to provide the expansion service address when
+ initiating Snowflake transforms provided in this module.
+
+ Flink Users can use the built-in Expansion Service of the Flink Runner's
+ Job Server. If you start Flink's Job Server, the expansion service will be
+ started on port 8097. For a different address, please set the
+ expansion_service parameter.
+
+ **More information**
+
+ For more information regarding cross-language transforms see:
+ - https://beam.apache.org/roadmap/portability/
+
+ For more information specific to Flink runner see:
+ - https://beam.apache.org/documentation/runners/flink/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from past.builtins import unicode
+
+import apache_beam as beam
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+
+__all__ = [
+ 'ReadFromSnowflake',
+ 'WriteToSnowflake',
+ 'WriteDisposition',
+ 'CreateDisposition',
+]
+
+
+def default_io_expansion_service():
+ return BeamJarExpansionService(
+ 'sdks:java:io:snowflake:expansion-service:shadowJar')
+
+
+ReadFromSnowflakeSchema = NamedTuple(
+ 'ReadFromSnowflakeSchema',
+ [
+ ('server_name', unicode),
+ ('schema', unicode),
+ ('database', unicode),
+ ('staging_bucket_name', unicode),
+ ('storage_integration_name', unicode),
+ ('username', Optional[unicode]),
+ ('password', Optional[unicode]),
+ ('private_key_path', Optional[unicode]),
+ ('private_key_passphrase', Optional[unicode]),
+ ('o_auth_token', Optional[unicode]),
+ ('table', Optional[unicode]),
+ ('query', Optional[unicode]),
+ ('role', Optional[unicode]),
+ ('warehouse', Optional[unicode]),
+ ])
+
+
+class ReadFromSnowflake(beam.PTransform):
+ """
+ An external PTransform which reads from Snowflake.
+ """
+
+ URN = 'beam:external:java:snowflake:read:v1'
+
+ def __init__(
+ self,
+ server_name,
+ schema,
+ database,
+ staging_bucket_name,
+ storage_integration_name,
+ csv_mapper,
+ username=None,
+ password=None,
+ private_key_path=None,
+ private_key_passphrase=None,
+ o_auth_token=None,
+ table=None,
+ query=None,
+ role=None,
+ warehouse=None,
+ expansion_service=None):
+ """
+ Initializes a read operation from Snowflake.
+
+ Required parameters:
+
+ :param server_name: full Snowflake server name with the following format
+ https://account.region.gcp.snowflakecomputing.com.
+ :param schema: name of the Snowflake schema in the database to use.
+ :param database: name of the Snowflake database to use.
+ :param staging_bucket_name: name of the Google Cloud Storage bucket.
+ Bucket will be used as a temporary location for storing CSV files.
+ Those temporary directories will be named
+ 'sf_copy_csv_DATE_TIME_RANDOMSUFFIX'
+ and they will be removed automatically once Read operation finishes.
+ :param storage_integration_name: is the name of storage integration
+ object created according to Snowflake documentation.
+ :param csv_mapper: specifies a function which must translate
+ user-defined object to array of strings.
+ SnowflakeIO uses a COPY INTO <location> statement to move data from
+ a Snowflake table to Google Cloud Storage as CSV files.These files
+ are then downloaded via FileIO and processed line by line.
+ Each line is split into an array of Strings using the OpenCSV
+ The csv_mapper function job is to give the user the possibility to
+ convert the array of Strings to a user-defined type,
+ ie. GenericRecord for Avro or Parquet files, or custom objects.
+ Example:
+ def csv_mapper(strings_array)
+ return User(strings_array[0], int(strings_array[1])))
+ :param table: specifies a Snowflake table name.
+ :param query: specifies a Snowflake custom SQL query.
+ :param role: specifies a Snowflake role.
+ :param warehouse: specifies a Snowflake warehouse name.
+ :param expansion_service: specifies URL of expansion service.
+
+ Authentication parameters:
+
+ :param username: specifies username for
+ username/password authentication method.
+ :param password: specifies password for
+ username/password authentication method.
+ :param private_key_path: specifies a private key file for
+ key/ pair authentication method.
+ :param private_key_passphrase: specifies password for
+ key/ pair authentication method.
+ :param o_auth_token: specifies access token for
+ OAuth authentication method.
+ """
+ verify_credentials(
+ username,
+ password,
+ private_key_path,
+ private_key_passphrase,
+ o_auth_token,
+ )
+
+ self.params = ReadFromSnowflakeSchema(
+ server_name=server_name,
+ schema=schema,
+ database=database,
+ staging_bucket_name=staging_bucket_name,
+ storage_integration_name=storage_integration_name,
+ username=username,
+ password=password,
+ private_key_path=private_key_path,
+ private_key_passphrase=private_key_passphrase,
+ o_auth_token=o_auth_token,
+ table=table,
+ query=query,
+ role=role,
+ warehouse=warehouse,
+ )
+ self.csv_mapper = csv_mapper
+ self.expansion_service = expansion_service or default_io_expansion_service()
+
+ def expand(self, pbegin):
+ return (
+ pbegin
+ | ExternalTransform(
+ self.URN,
+ NamedTupleBasedPayloadBuilder(self.params),
+ self.expansion_service,
+ )
+ | 'CSV to array mapper' >> beam.Map(lambda csv: csv.split(b','))
+ | 'CSV mapper' >> beam.Map(self.csv_mapper))
+
+
+WriteToSnowflakeSchema = NamedTuple(
+ 'WriteToSnowflakeSchema',
+ [
+ ('server_name', unicode),
+ ('schema', unicode),
+ ('database', unicode),
+ ('staging_bucket_name', unicode),
+ ('storage_integration_name', unicode),
+ ('create_disposition', unicode),
+ ('write_disposition', unicode),
+ ('table_schema', unicode),
+ ('username', Optional[unicode]),
+ ('password', Optional[unicode]),
+ ('private_key_path', Optional[unicode]),
+ ('private_key_passphrase', Optional[unicode]),
+ ('o_auth_token', Optional[unicode]),
+ ('table', Optional[unicode]),
+ ('query', Optional[unicode]),
+ ('role', Optional[unicode]),
+ ('warehouse', Optional[unicode]),
+ ],
+)
+
+
+class WriteToSnowflake(beam.PTransform):
+ """
+ An external PTransform which writes to Snowflake.
+ """
+
+ URN = 'beam:external:java:snowflake:write:v1'
+
+ def __init__(
+ self,
+ server_name,
+ schema,
+ database,
+ staging_bucket_name,
+ storage_integration_name,
+ create_disposition,
+ write_disposition,
+ table_schema,
+ user_data_mapper,
+ username=None,
+ password=None,
+ private_key_path=None,
+ private_key_passphrase=None,
+ o_auth_token=None,
+ table=None,
+ query=None,
+ role=None,
+ warehouse=None,
+ expansion_service=None,
+ ):
+ # pylint: disable=line-too-long
+
+ """
+ Initializes a write operation to Snowflake.
+
+ Required parameters:
+
+ :param server_name: full Snowflake server name with the following format
+ https://account.region.gcp.snowflakecomputing.com.
+ :param schema: name of the Snowflake schema in the database to use.
+ :param database: name of the Snowflake database to use.
+ :param staging_bucket_name: name of the Google Cloud Storage bucket.
+ Bucket will be used as a temporary location for storing CSV files.
+ :param storage_integration_name: is the name of a Snowflake storage
+ integration object created according to Snowflake documentation for the
+ GCS bucket.
+ :param user_data_mapper: specifies a function which maps data from
+ a PCollection to an array of String values before the write operation
+ saves the data to temporary .csv files.
+ Example:
+ def user_data_mapper(user):
+ return [user.name, str(user.age)]
+ :param table: specifies a Snowflake table name
+ :param query: specifies a custom SQL query
+ :param role: specifies a Snowflake role.
+ :param warehouse: specifies a Snowflake warehouse name.
+ :param expansion_service: specifies URL of expansion service.
+
+ Authentication parameters:
+
+ :param username: specifies username for
+ username/password authentication method.
+ :param password: specifies password for
+ username/password authentication method.
+ :param private_key_path: specifies a private key file for
+ key/ pair authentication method.
+ :param private_key_passphrase: specifies password for
+ key/ pair authentication method.
+ :param o_auth_token: specifies access token for
+ OAuth authentication method.
+
+ Additional parameters:
+
+ :param create_disposition: Defines the behaviour of the write operation if
+ the target table does not exist. The following values are supported:
+ CREATE_IF_NEEDED - default behaviour. The write operation checks whether
+ the specified target table exists; if it does not, the write operation
+ attempts to create the table Specify the schema for the target table
+ using the table_schema parameter.
+ CREATE_NEVER - The write operation fails if the target table does not
+ exist.
+ :param write_disposition: Defines the write behaviour based on the table
+ where data will be written to. The following values are supported:
+ APPEND - Default behaviour. Written data is added to the existing rows
+ in the table,
+ EMPTY - The target table must be empty; otherwise, the write operation
+ fails,
+ TRUNCATE - The write operation deletes all rows from the target table
+ before writing to it.
+ :param table_schema: When the create_disposition parameter is set to
+ CREATE_IF_NEEDED, the table_schema parameter enables specifying the
+ schema for the created target table. A table schema is as JSON with the
+ following structure:
+ {"schema":[
+ {
+ "dataType":{"type":"<COLUMN DATA TYPE>"},
+ "name":"<COLUMN NAME> ",
+ "nullable": <NULLABLE>
+ },
+ ]}
+
+ All supported data types:
+ {"schema":[
+ {"dataType":{"type":"date"},"name":"","nullable":false},
+ {"dataType":{"type":"datetime"},"name":"","nullable":false},
+ {"dataType":{"type":"time"},"name":"","nullable":false},
+ {"dataType":{"type":"timestamp"},"name":"","nullable":false},
+ {"dataType":{"type":"timestamp_ltz"},"name":"","nullable":false},
+ {"dataType":{"type":"timestamp_ntz"},"name":"","nullable":false},
+ {"dataType":{"type":"timestamp_tz"},"name":"","nullable":false},
+ {"dataType":{"type":"boolean"},"name":"","nullable":false},
+ {"dataType":{"type":"decimal","precision":38,"scale":1},"name":"","nullable":true},
+ {"dataType":{"type":"double"},"name":"","nullable":false},
+ {"dataType":{"type":"float"},"name":"","nullable":false},
+ {"dataType":{"type":"integer","precision":38,"scale":0},"name":"","nullable":false},
+ {"dataType":{"type":"number","precision":38,"scale":1},"name":"","nullable":false},
+ {"dataType":{"type":"numeric","precision":38,"scale":2},"name":"","nullable":false},
+ {"dataType":{"type":"real"},"name":"","nullable":false},
+ {"dataType":{"type":"array"},"name":"","nullable":false},
+ {"dataType":{"type":"object"},"name":"","nullable":false},
+ {"dataType":{"type":"variant"},"name":"","nullable":true},
+ {"dataType":{"type":"binary","size":null},"name":"","nullable":false},
+ {"dataType":{"type":"char","length":1},"name":"","nullable":false},
+ {"dataType":{"type":"string","length":null},"name":"","nullable":false},
+ {"dataType":{"type":"text","length":null},"name":"","nullable":false},
+ {"dataType":{"type":"varbinary","size":null},"name":"","nullable":false},
+ {"dataType":{"type":"varchar","length":100},"name":"","nullable":false}]
+ }
+ """
+ verify_credentials(
+ username,
+ password,
+ private_key_path,
+ private_key_passphrase,
+ o_auth_token,
+ )
+ WriteDisposition.VerifyParam(write_disposition)
+ CreateDisposition.VerifyParam(create_disposition)
+
+ self.params = WriteToSnowflakeSchema(
+ server_name=server_name,
+ schema=schema,
+ database=database,
+ staging_bucket_name=staging_bucket_name,
+ storage_integration_name=storage_integration_name,
+ create_disposition=create_disposition,
+ write_disposition=write_disposition,
+ table_schema=table_schema,
+ username=username,
+ password=password,
+ private_key_path=private_key_path,
+ private_key_passphrase=private_key_passphrase,
+ o_auth_token=o_auth_token,
+ table=table,
+ query=query,
+ role=role,
+ warehouse=warehouse,
+ )
+ self.user_data_mapper = user_data_mapper
+ self.expansion_service = expansion_service or default_io_expansion_service()
+
+ def expand(self, pbegin):
+ return (
+ pbegin
+ | 'User data mapper' >> beam.Map(
+ self.user_data_mapper).with_output_types(List[bytes])
+ | ExternalTransform(
+ self.URN,
+ NamedTupleBasedPayloadBuilder(self.params),
+ self.expansion_service))
+
+
+class CreateDisposition:
+ """
+ Enum class for possible values of create dispositions:
+ CREATE_IF_NEEDED: default behaviour. The write operation checks whether
+ the specified target table exists; if it does not, the write operation
+ attempts to create the table Specify the schema for the target table
+ using the table_schema parameter.
+ CREATE_NEVER: The write operation fails if the target table does not exist.
+ """
+ CREATE_IF_NEEDED = 'CREATE_IF_NEEDED'
+ CREATE_NEVER = 'CREATE_NEVER'
+
+ @staticmethod
+ def VerifyParam(field):
+ if field and not hasattr(CreateDisposition, field):
+ raise RuntimeError(
+ 'Create disposition has to be one of the following values:'
+ 'CREATE_IF_NEEDED, CREATE_NEVER. Got: {}'.format(field))
+
+
+class WriteDisposition:
+ """
+ Enum class for possible values of write dispositions:
+ APPEND: Default behaviour. Written data is added to the existing rows
+ in the table,
+ EMPTY: The target table must be empty; otherwise, the write operation fails,
+ TRUNCATE: The write operation deletes all rows from the target table
+ before writing to it.
+ """
+ APPEND = 'APPEND'
+ EMPTY = 'EMPTY'
+ TRUNCATE = 'TRUNCATE'
+
+ @staticmethod
+ def VerifyParam(field):
+ if field and not hasattr(WriteDisposition, field):
+ raise RuntimeError(
+ 'Write disposition has to be one of the following values:'
+ 'APPEND, EMPTY, TRUNCATE. Got: {}'.format(field))
+
+
+def verify_credentials(
+ username, password, private_key_path, private_key_passphrase, o_auth_token):
+ if not (o_auth_token or (username and password) or
+ (username and private_key_path and private_key_passphrase)):
+ raise RuntimeError('Snowflake credentials are not set correctly.')