You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2022/04/06 23:15:23 UTC
[beam] branch master updated: [BEAM-11936] Fix rawtypes warnings in SnowflakeIO (#17257)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a2391149eae [BEAM-11936] Fix rawtypes warnings in SnowflakeIO (#17257)
a2391149eae is described below
commit a2391149eaed5e3e7d86fd4f1f146ab9d6848e30
Author: Kamil BreguĊa <ka...@snowflake.com>
AuthorDate: Thu Apr 7 01:15:15 2022 +0200
[BEAM-11936] Fix rawtypes warnings in SnowflakeIO (#17257)
* [BEAM-10556] Fix rawtypes warnings in SnowflakeIO
* fixup! [BEAM-10556] Fix rawtypes warnings in SnowflakeIO
---
.../apache/beam/sdk/io/snowflake/SnowflakeIO.java | 167 ++++++++++-----------
.../crosslanguage/SnowflakeTransformRegistrar.java | 1 +
.../io/snowflake/crosslanguage/WriteBuilder.java | 10 +-
.../io/snowflake/data/SnowflakeTableSchema.java | 3 +-
.../services/SnowflakeBatchServiceConfig.java | 2 +-
.../services/SnowflakeBatchServiceImpl.java | 15 +-
...nowflakeService.java => SnowflakeServices.java} | 18 ++-
...lakeService.java => SnowflakeServicesImpl.java} | 15 +-
.../services/SnowflakeStreamingServiceConfig.java | 2 +-
.../services/SnowflakeStreamingServiceImpl.java | 5 +-
.../test/FakeSnowflakeBatchServiceImpl.java | 5 +-
.../snowflake/test/FakeSnowflakeServicesImpl.java} | 20 ++-
.../test/FakeSnowflakeStreamingServiceImpl.java | 7 +-
.../beam/sdk/io/snowflake/test/TestUtils.java | 34 ++---
.../test/unit/data/SnowflakeDataTypeValidTest.java | 5 +-
.../test/unit/read/SnowflakeIOReadTest.java | 46 +++---
.../test/unit/write/CreateDispositionTest.java | 21 ++-
.../unit/write/QueryDispositionLocationTest.java | 17 +--
.../test/unit/write/SchemaDispositionTest.java | 17 +--
.../test/unit/write/SnowflakeIOWriteTest.java | 23 ++-
.../test/unit/write/StreamingWriteTest.java | 19 +--
21 files changed, 217 insertions(+), 235 deletions(-)
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
index 90bc9ee188b..44180bcd534 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
@@ -56,9 +56,9 @@ import org.apache.beam.sdk.io.snowflake.enums.StreamingLogLevel;
import org.apache.beam.sdk.io.snowflake.enums.WriteDisposition;
import org.apache.beam.sdk.io.snowflake.services.SnowflakeBatchServiceConfig;
import org.apache.beam.sdk.io.snowflake.services.SnowflakeBatchServiceImpl;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServicesImpl;
import org.apache.beam.sdk.io.snowflake.services.SnowflakeStreamingServiceConfig;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeStreamingServiceImpl;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
@@ -83,8 +83,8 @@ import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
import org.joda.time.Duration;
@@ -176,7 +176,6 @@ import org.slf4j.LoggerFactory;
*/
@Experimental
@SuppressWarnings({
- "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class SnowflakeIO {
@@ -192,14 +191,15 @@ public class SnowflakeIO {
static final Duration DEFAULT_SLEEP_STREAMING_LOGS = Duration.standardSeconds(5000);
/**
- * Read data from Snowflake via COPY statement using user-defined {@link SnowflakeService}.
+ * Read data from Snowflake via COPY statement using user-defined {@link SnowflakeServices}.
*
- * @param snowflakeService user-defined {@link SnowflakeService}
+ * @param snowflakeServices user-defined {@link SnowflakeServices}
* @param <T> Type of the data to be read.
*/
- public static <T> Read<T> read(SnowflakeService snowflakeService) {
+ @VisibleForTesting
+ public static <T> Read<T> read(SnowflakeServices snowflakeServices) {
return new AutoValue_SnowflakeIO_Read.Builder<T>()
- .setSnowflakeService(snowflakeService)
+ .setSnowflakeServices(snowflakeServices)
.setQuotationMark(ValueProvider.StaticValueProvider.of(CSV_QUOTE_CHAR))
.build();
}
@@ -210,7 +210,7 @@ public class SnowflakeIO {
* @param <T> Type of the data to be read.
*/
public static <T> Read<T> read() {
- return read(new SnowflakeBatchServiceImpl());
+ return read(new SnowflakeServicesImpl());
}
/**
@@ -255,7 +255,6 @@ public class SnowflakeIO {
/** Implementation of {@link #read()}. */
@AutoValue
@AutoValue.CopyAnnotations
- @SuppressWarnings({"rawtypes"})
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
abstract @Nullable SerializableFunction<Void, DataSource> getDataSourceProviderFn();
@@ -276,7 +275,7 @@ public class SnowflakeIO {
abstract @Nullable Coder<T> getCoder();
- abstract @Nullable SnowflakeService getSnowflakeService();
+ abstract @Nullable SnowflakeServices getSnowflakeServices();
@Nullable
abstract ValueProvider<String> getQuotationMark();
@@ -300,7 +299,7 @@ public class SnowflakeIO {
abstract Builder<T> setCoder(Coder<T> coder);
- abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+ abstract Builder<T> setSnowflakeServices(SnowflakeServices snowflakeServices);
abstract Builder<T> setQuotationMark(ValueProvider<String> quotationMark);
@@ -439,7 +438,7 @@ public class SnowflakeIO {
getStorageIntegrationName(),
getStagingBucketName(),
tmpDirName,
- getSnowflakeService(),
+ getSnowflakeServices(),
getQuotationMark())))
.apply(Reshuffle.viaRandomKey())
.apply(FileIO.matchAll())
@@ -493,7 +492,7 @@ public class SnowflakeIO {
private final ValueProvider<String> storageIntegrationName;
private final ValueProvider<String> stagingBucketDir;
private final String tmpDirName;
- private final SnowflakeService snowflakeService;
+ private final SnowflakeServices snowflakeServices;
private final ValueProvider<String> quotationMark;
private CopyIntoStageFn(
@@ -503,13 +502,13 @@ public class SnowflakeIO {
ValueProvider<String> storageIntegrationName,
ValueProvider<String> stagingBucketDir,
String tmpDirName,
- SnowflakeService snowflakeService,
+ SnowflakeServices snowflakeServices,
ValueProvider<String> quotationMark) {
this.dataSourceProviderFn = dataSourceProviderFn;
this.query = query;
this.table = table;
this.storageIntegrationName = storageIntegrationName;
- this.snowflakeService = snowflakeService;
+ this.snowflakeServices = snowflakeServices;
this.quotationMark = quotationMark;
this.stagingBucketDir = stagingBucketDir;
this.tmpDirName = tmpDirName;
@@ -545,7 +544,7 @@ public class SnowflakeIO {
stagingBucketRunDir,
quotationMark.get());
- String output = snowflakeService.read(config);
+ String output = snowflakeServices.getBatchService().read(config);
context.output(output);
}
@@ -634,7 +633,6 @@ public class SnowflakeIO {
/** Implementation of {@link #write()}. */
@AutoValue
@AutoValue.CopyAnnotations
- @SuppressWarnings({"rawtypes"})
public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
abstract @Nullable SerializableFunction<Void, DataSource> getDataSourceProviderFn();
@@ -673,13 +671,13 @@ public class SnowflakeIO {
abstract CreateDisposition getCreateDisposition();
@Nullable
- abstract UserDataMapper getUserDataMapper();
+ abstract UserDataMapper<T> getUserDataMapper();
@Nullable
abstract SnowflakeTableSchema getTableSchema();
@Nullable
- abstract SnowflakeService getSnowflakeService();
+ abstract SnowflakeServices getSnowflakeServices();
@Nullable
abstract String getQuotationMark();
@@ -712,7 +710,7 @@ public class SnowflakeIO {
abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
- abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+ abstract Builder<T> setUserDataMapper(UserDataMapper<T> userDataMapper);
abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
@@ -720,7 +718,7 @@ public class SnowflakeIO {
abstract Builder<T> setTableSchema(SnowflakeTableSchema tableSchema);
- abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+ abstract Builder<T> setSnowflakeServices(SnowflakeServices snowflakeServices);
abstract Builder<T> setQuotationMark(String quotationMark);
@@ -823,7 +821,7 @@ public class SnowflakeIO {
*
* @param userDataMapper an instance of {@link UserDataMapper}.
*/
- public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+ public Write<T> withUserDataMapper(UserDataMapper<T> userDataMapper) {
return toBuilder().setUserDataMapper(userDataMapper).build();
}
@@ -935,12 +933,13 @@ public class SnowflakeIO {
}
/**
- * A snowflake service {@link SnowflakeService} implementation which is supposed to be used.
+ * A snowflake service {@link SnowflakeServices} implementation which is supposed to be used.
*
- * @param snowflakeService an instance of {@link SnowflakeService}.
+ * @param snowflakeServices an instance of {@link SnowflakeServices}.
*/
- public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
- return toBuilder().setSnowflakeService(snowflakeService).build();
+ @VisibleForTesting
+ public Write<T> withSnowflakeServices(SnowflakeServices snowflakeServices) {
+ return toBuilder().setSnowflakeServices(snowflakeServices).build();
}
/**
@@ -971,7 +970,7 @@ public class SnowflakeIO {
public PDone expand(PCollection<T> input) {
checkArguments(input);
- PCollection out;
+ PCollection<Void> out;
if (getSnowPipe() != null) {
out = writeStream(input, getStagingBucketName());
@@ -979,8 +978,6 @@ public class SnowflakeIO {
out = writeBatch(input, getStagingBucketName());
}
- out.setCoder(StringUtf8Coder.of());
-
return PDone.in(out.getPipeline());
}
@@ -1004,12 +1001,10 @@ public class SnowflakeIO {
}
}
- private PCollection<T> writeStream(
+ private PCollection<Void> writeStream(
PCollection<T> input, ValueProvider<String> stagingBucketDir) {
- SnowflakeService snowflakeService =
- getSnowflakeService() != null
- ? getSnowflakeService()
- : new SnowflakeStreamingServiceImpl();
+ SnowflakeServices snowflakeServices =
+ getSnowflakeServices() != null ? getSnowflakeServices() : new SnowflakeServicesImpl();
/* Ensure that files will be created after specific record count or duration specified */
PCollection<T> inputInGlobalWindow =
@@ -1025,50 +1020,44 @@ public class SnowflakeIO {
.discardingFiredPanes());
int shards = (getShardsNumber() > 0) ? getShardsNumber() : DEFAULT_STREAMING_SHARDS_NUMBER;
- PCollection files = writeFiles(inputInGlobalWindow, stagingBucketDir, shards);
+ PCollection<String> files = writeFiles(inputInGlobalWindow, stagingBucketDir, shards);
/* Ensuring that files will be ingested after flush time */
files =
- (PCollection)
- files.apply(
- "Apply User Trigger",
- Window.<T>into(new GlobalWindows())
- .triggering(
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(getFlushTimeLimit())))
- .discardingFiredPanes());
- files =
- (PCollection)
- files.apply(
- "Create list of files for loading via SnowPipe",
- Combine.globally(new Concatenate()).withoutDefaults());
+ files.apply(
+ "Apply User Trigger",
+ Window.<String>into(new GlobalWindows())
+ .triggering(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(getFlushTimeLimit())))
+ .discardingFiredPanes());
+ PCollection<List<String>> filesConcatenated =
+ files.apply(
+ "Create list of files for loading via SnowPipe",
+ Combine.globally(new Concatenate()).withoutDefaults());
- return (PCollection)
- files.apply("Stream files to table", streamToTable(snowflakeService, stagingBucketDir));
+ return filesConcatenated.apply(
+ "Stream files to table", streamToTable(snowflakeServices, stagingBucketDir));
}
- private PCollection writeBatch(PCollection input, ValueProvider<String> stagingBucketDir) {
- SnowflakeService snowflakeService =
- getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeBatchServiceImpl();
+ private PCollection<Void> writeBatch(
+ PCollection<T> input, ValueProvider<String> stagingBucketDir) {
+ SnowflakeServices snowflakeServices =
+ getSnowflakeServices() != null ? getSnowflakeServices() : new SnowflakeServicesImpl();
PCollection<String> files = writeBatchFiles(input, stagingBucketDir);
// Combining PCollection of files as a side input into one list of files
ListCoder<String> coder = ListCoder.of(StringUtf8Coder.of());
- files =
- (PCollection)
- files
- .getPipeline()
- .apply(
- Reify.viewInGlobalWindow(
- (PCollectionView) files.apply(View.asList()), coder));
+ PCollection<List<String>> reifiedFiles =
+ files.getPipeline().apply(Reify.viewInGlobalWindow(files.apply(View.asList()), coder));
- return (PCollection)
- files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+ return reifiedFiles.apply(
+ "Copy files to table", copyToTable(snowflakeServices, stagingBucketDir));
}
- private PCollection writeBatchFiles(
+ private PCollection<String> writeBatchFiles(
PCollection<T> input, ValueProvider<String> outputDirectory) {
int shards = (getShardsNumber() > 0) ? getShardsNumber() : DEFAULT_BATCH_SHARDS_NUMBER;
return writeFiles(input, outputDirectory, shards);
@@ -1092,7 +1081,7 @@ public class SnowflakeIO {
ParDo.of(new MapObjectsArrayToCsvFn(getQuotationMark())))
.setCoder(StringUtf8Coder.of());
- WriteFilesResult filesResult =
+ WriteFilesResult<Void> filesResult =
mappedUserData.apply(
"Write files to specified location",
FileIO.<String>write()
@@ -1103,16 +1092,15 @@ public class SnowflakeIO {
.withNumShards(numShards)
.withCompression(Compression.GZIP));
- return (PCollection)
- filesResult
- .getPerDestinationOutputFilenames()
- .apply("Parse KV filenames to Strings", Values.<String>create());
+ return filesResult
+ .getPerDestinationOutputFilenames()
+ .apply("Parse KV filenames to Strings", Values.<String>create());
}
- private ParDo.SingleOutput<Object, Object> copyToTable(
- SnowflakeService snowflakeService, ValueProvider<String> stagingBucketDir) {
+ private ParDo.SingleOutput<List<String>, Void> copyToTable(
+ SnowflakeServices snowflakeServices, ValueProvider<String> stagingBucketDir) {
return ParDo.of(
- new CopyToTableFn<>(
+ new CopyToTableFn(
getDataSourceProviderFn(),
getTable(),
getQuery(),
@@ -1121,19 +1109,19 @@ public class SnowflakeIO {
getCreateDisposition(),
getWriteDisposition(),
getTableSchema(),
- snowflakeService,
+ snowflakeServices,
getQuotationMark()));
}
- protected PTransform streamToTable(
- SnowflakeService snowflakeService, ValueProvider<String> stagingBucketDir) {
+ protected ParDo.SingleOutput<List<String>, Void> streamToTable(
+ SnowflakeServices snowflakeServices, ValueProvider<String> stagingBucketDir) {
return ParDo.of(
new StreamToTableFn(
getDataSourceProviderFn(),
getSnowPipe(),
stagingBucketDir,
getDebugMode(),
- snowflakeService));
+ snowflakeServices));
}
}
@@ -1206,7 +1194,7 @@ public class SnowflakeIO {
}
}
- private static class CopyToTableFn<ParameterT, OutputT> extends DoFn<ParameterT, OutputT> {
+ private static class CopyToTableFn extends DoFn<List<String>, Void> {
private final SerializableFunction<Void, DataSource> dataSourceProviderFn;
private final ValueProvider<String> table;
private final ValueProvider<String> database;
@@ -1218,7 +1206,7 @@ public class SnowflakeIO {
private final ValueProvider<String> storageIntegrationName;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
- private final SnowflakeService snowflakeService;
+ private final SnowflakeServices snowflakeServices;
CopyToTableFn(
SerializableFunction<Void, DataSource> dataSourceProviderFn,
@@ -1229,7 +1217,7 @@ public class SnowflakeIO {
CreateDisposition createDisposition,
WriteDisposition writeDisposition,
SnowflakeTableSchema tableSchema,
- SnowflakeService snowflakeService,
+ SnowflakeServices snowflakeServices,
String quotationMark) {
this.dataSourceProviderFn = dataSourceProviderFn;
this.query = query;
@@ -1239,7 +1227,7 @@ public class SnowflakeIO {
this.storageIntegrationName = storageIntegrationName;
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
- this.snowflakeService = snowflakeService;
+ this.snowflakeServices = snowflakeServices;
this.quotationMark = quotationMark;
DataSourceProviderFromDataSourceConfiguration dataSourceProviderFromDataSourceConfiguration =
@@ -1260,7 +1248,7 @@ public class SnowflakeIO {
SnowflakeBatchServiceConfig config =
new SnowflakeBatchServiceConfig(
dataSourceProviderFn,
- (List<String>) context.element(),
+ context.element(),
tableSchema,
databaseValue,
schemaValue,
@@ -1271,17 +1259,17 @@ public class SnowflakeIO {
storageIntegrationName.get(),
stagingBucketDir.get(),
quotationMark);
- snowflakeService.write(config);
+ snowflakeServices.getBatchService().write(config);
}
}
/** Custom DoFn that streams data to Snowflake table. */
- private static class StreamToTableFn<ParameterT, OutputT> extends DoFn<ParameterT, OutputT> {
+ private static class StreamToTableFn extends DoFn<List<String>, Void> {
private final SerializableFunction<Void, DataSource> dataSourceProviderFn;
private final ValueProvider<String> stagingBucketDir;
private final ValueProvider<String> snowPipe;
private final StreamingLogLevel debugMode;
- private final SnowflakeService snowflakeService;
+ private final SnowflakeServices snowflakeServices;
private transient SimpleIngestManager ingestManager;
ArrayList<String> trackedFilesNames;
@@ -1291,12 +1279,12 @@ public class SnowflakeIO {
ValueProvider<String> snowPipe,
ValueProvider<String> stagingBucketDir,
StreamingLogLevel debugMode,
- SnowflakeService snowflakeService) {
+ SnowflakeServices snowflakeServices) {
this.dataSourceProviderFn = dataSourceProviderFn;
this.stagingBucketDir = stagingBucketDir;
this.snowPipe = snowPipe;
this.debugMode = debugMode;
- this.snowflakeService = snowflakeService;
+ this.snowflakeServices = snowflakeServices;
trackedFilesNames = new ArrayList<>();
}
@@ -1335,7 +1323,7 @@ public class SnowflakeIO {
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
- List<String> filesList = (List<String>) context.element();
+ List<String> filesList = context.element();
if (debugMode != null) {
trackedFilesNames.addAll(filesList);
@@ -1343,7 +1331,7 @@ public class SnowflakeIO {
SnowflakeStreamingServiceConfig config =
new SnowflakeStreamingServiceConfig(
filesList, this.stagingBucketDir.get(), this.ingestManager);
- snowflakeService.write(config);
+ snowflakeServices.getStreamingService().write(config);
}
@FinishBundle
@@ -1395,7 +1383,6 @@ public class SnowflakeIO {
*/
@AutoValue
@AutoValue.CopyAnnotations
- @SuppressWarnings({"rawtypes"})
public abstract static class DataSourceConfiguration implements Serializable {
@Nullable
public abstract String getUrl();
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java
index adc19201c8b..76bfb575976 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/WriteBuilder.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/WriteBuilder.java
index 89233f0937d..ba21f414878 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/WriteBuilder.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/WriteBuilder.java
@@ -36,7 +36,8 @@ import org.apache.beam.sdk.values.PDone;
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class WriteBuilder
- implements ExternalTransformBuilder<WriteBuilder.Configuration, PCollection<byte[]>, PDone> {
+ implements ExternalTransformBuilder<
+ WriteBuilder.Configuration, PCollection<List<byte[]>>, PDone> {
/** Parameters class to expose the transform to an external SDK. */
public static class Configuration extends CrossLanguageConfiguration {
@@ -76,8 +77,8 @@ public class WriteBuilder
}
@Override
- public PTransform<PCollection<byte[]>, PDone> buildExternal(Configuration c) {
- return SnowflakeIO.<byte[]>write()
+ public PTransform<PCollection<List<byte[]>>, PDone> buildExternal(Configuration c) {
+ return SnowflakeIO.<List<byte[]>>write()
.withDataSourceConfiguration(c.getDataSourceConfiguration())
.withStorageIntegrationName(c.getStorageIntegrationName())
.withStagingBucketName(c.getStagingBucketName())
@@ -90,7 +91,6 @@ public class WriteBuilder
}
private static SnowflakeIO.UserDataMapper<List<byte[]>> getStringCsvMapper() {
- return (SnowflakeIO.UserDataMapper<List<byte[]>>)
- recordLine -> recordLine.stream().map(String::new).toArray();
+ return recordLine -> recordLine.stream().map(String::new).toArray();
}
}
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeTableSchema.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeTableSchema.java
index 095feb8c324..f47ea9fc339 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeTableSchema.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeTableSchema.java
@@ -22,11 +22,12 @@ import java.util.ArrayList;
import java.util.List;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonProperty;
import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
+import org.apache.beam.sdk.io.snowflake.enums.CreateDisposition;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
/**
* POJO representing schema of Table in Snowflake. Used by {@link SnowflakeIO.Write} when {@link
- * SnowflakeIO.Write.CreateDisposition#CREATE_IF_NEEDED} disposition is used.
+ * CreateDisposition#CREATE_IF_NEEDED} disposition is used.
*/
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceConfig.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceConfig.java
index d9f018bea50..36819e196aa 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceConfig.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceConfig.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
-public class SnowflakeBatchServiceConfig extends ServiceConfig {
+public class SnowflakeBatchServiceConfig {
private final SerializableFunction<Void, DataSource> dataSourceProviderFn;
private final String database;
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceImpl.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceImpl.java
index 2700d8bfaa3..279e8fbb808 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceImpl.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceImpl.java
@@ -34,12 +34,11 @@ import org.apache.beam.sdk.io.snowflake.enums.CreateDisposition;
import org.apache.beam.sdk.io.snowflake.enums.WriteDisposition;
import org.apache.beam.sdk.transforms.SerializableFunction;
-/** Implemenation of {@link SnowflakeService} used in production. */
+/** Implemenation of {@link SnowflakeServices.BatchService} used in production. */
@SuppressWarnings({
- "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
-public class SnowflakeBatchServiceImpl implements SnowflakeService<SnowflakeBatchServiceConfig> {
+public class SnowflakeBatchServiceImpl implements SnowflakeServices.BatchService {
private static final String SNOWFLAKE_GCS_PREFIX = "gcs://";
private static final String GCS_PREFIX = "gs://";
@@ -168,7 +167,7 @@ public class SnowflakeBatchServiceImpl implements SnowflakeService<SnowflakeBatc
selectQuery,
resultSet -> {
assert resultSet != null;
- checkIfTableIsEmpty((ResultSet) resultSet);
+ checkIfTableIsEmpty(resultSet);
});
}
@@ -234,7 +233,7 @@ public class SnowflakeBatchServiceImpl implements SnowflakeService<SnowflakeBatc
query,
resultSet -> {
assert resultSet != null;
- if (!checkResultIfTableExists((ResultSet) resultSet)) {
+ if (!checkResultIfTableExists(resultSet)) {
try {
createTable(dataSource, table, tableSchema);
} catch (SQLException e) {
@@ -271,13 +270,15 @@ public class SnowflakeBatchServiceImpl implements SnowflakeService<SnowflakeBatc
}
private static void runConnectionWithStatement(
- DataSource dataSource, String query, Consumer resultSetMethod) throws SQLException {
+ DataSource dataSource, String query, Consumer<ResultSet> resultSetMethod)
+ throws SQLException {
Connection connection = dataSource.getConnection();
runStatement(query, connection, resultSetMethod);
connection.close();
}
- private static void runStatement(String query, Connection connection, Consumer resultSetMethod)
+ private static void runStatement(
+ String query, Connection connection, Consumer<ResultSet> resultSetMethod)
throws SQLException {
PreparedStatement statement = connection.prepareStatement(query);
try {
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeService.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeServices.java
similarity index 66%
copy from sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeService.java
copy to sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeServices.java
index 192856ec477..61b1df21d0e 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeService.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeServices.java
@@ -20,8 +20,20 @@ package org.apache.beam.sdk.io.snowflake.services;
import java.io.Serializable;
/** Interface which defines common methods for interacting with Snowflake. */
-public interface SnowflakeService<T extends ServiceConfig> extends Serializable {
- String read(T config) throws Exception;
+public interface SnowflakeServices extends Serializable {
+ BatchService getBatchService();
- void write(T config) throws Exception;
+ StreamingService getStreamingService();
+
+ interface BatchService {
+ String read(SnowflakeBatchServiceConfig config) throws Exception;
+
+ void write(SnowflakeBatchServiceConfig config) throws Exception;
+ }
+
+ interface StreamingService {
+ String read(SnowflakeStreamingServiceConfig config) throws Exception;
+
+ void write(SnowflakeStreamingServiceConfig config) throws Exception;
+ }
}
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeService.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeServicesImpl.java
similarity index 75%
rename from sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeService.java
rename to sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeServicesImpl.java
index 192856ec477..a8ccf13ff48 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeService.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeServicesImpl.java
@@ -17,11 +17,14 @@
*/
package org.apache.beam.sdk.io.snowflake.services;
-import java.io.Serializable;
+public class SnowflakeServicesImpl implements SnowflakeServices {
+ @Override
+ public BatchService getBatchService() {
+ return new SnowflakeBatchServiceImpl();
+ }
-/** Interface which defines common methods for interacting with Snowflake. */
-public interface SnowflakeService<T extends ServiceConfig> extends Serializable {
- String read(T config) throws Exception;
-
- void write(T config) throws Exception;
+ @Override
+ public StreamingService getStreamingService() {
+ return new SnowflakeStreamingServiceImpl();
+ }
}
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceConfig.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceConfig.java
index 7039c899133..d8bb53c3f50 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceConfig.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceConfig.java
@@ -21,7 +21,7 @@ import java.util.List;
import net.snowflake.ingest.SimpleIngestManager;
/** Class for preparing configuration for streaming write. */
-public class SnowflakeStreamingServiceConfig extends ServiceConfig {
+public class SnowflakeStreamingServiceConfig {
private final SimpleIngestManager ingestManager;
private final List<String> filesList;
private final String stagingBucketDir;
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceImpl.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceImpl.java
index 2634a17782c..73efef4a14e 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceImpl.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceImpl.java
@@ -25,12 +25,11 @@ import java.util.stream.Collectors;
import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.IngestResponseException;
-/** Implemenation of {@link SnowflakeService} used in production. */
+/** Implementation of {@link SnowflakeServices.StreamingService} used in production. */
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
-public class SnowflakeStreamingServiceImpl
- implements SnowflakeService<SnowflakeStreamingServiceConfig> {
+public class SnowflakeStreamingServiceImpl implements SnowflakeServices.StreamingService {
private transient SimpleIngestManager ingestManager;
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeBatchServiceImpl.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeBatchServiceImpl.java
index 90ee4b9d394..d4c4523845b 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeBatchServiceImpl.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeBatchServiceImpl.java
@@ -29,11 +29,10 @@ import org.apache.beam.sdk.io.snowflake.data.SnowflakeTableSchema;
import org.apache.beam.sdk.io.snowflake.enums.CreateDisposition;
import org.apache.beam.sdk.io.snowflake.enums.WriteDisposition;
import org.apache.beam.sdk.io.snowflake.services.SnowflakeBatchServiceConfig;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
/** Fake implementation of {@link SnowflakeService} used in tests. */
-public class FakeSnowflakeBatchServiceImpl
- implements SnowflakeService<SnowflakeBatchServiceConfig> {
+public class FakeSnowflakeBatchServiceImpl implements SnowflakeServices.BatchService {
@Override
public void write(SnowflakeBatchServiceConfig config) throws Exception {
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/ServiceConfig.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeServicesImpl.java
similarity index 66%
rename from sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/ServiceConfig.java
rename to sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeServicesImpl.java
index 1826ce9368e..168741c3562 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/ServiceConfig.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeServicesImpl.java
@@ -15,10 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.io.snowflake.services;
+package org.apache.beam.sdk.io.snowflake.test;
-/**
- * Configuration abstract class for {@link SnowflakeService} that gives parameters for write and
- * read (batch and streaming).
- */
-public abstract class ServiceConfig {}
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
+
+public class FakeSnowflakeServicesImpl implements SnowflakeServices {
+ @Override
+ public BatchService getBatchService() {
+ return new FakeSnowflakeBatchServiceImpl();
+ }
+
+ @Override
+ public StreamingService getStreamingService() {
+ return new FakeSnowflakeStreamingServiceImpl();
+ }
+}
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeStreamingServiceImpl.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeStreamingServiceImpl.java
index 8dc4694e72a..ac49b6e979b 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeStreamingServiceImpl.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeStreamingServiceImpl.java
@@ -25,12 +25,11 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
import org.apache.beam.sdk.io.snowflake.services.SnowflakeStreamingServiceConfig;
-/** Fake implementation of {@link SnowflakeService} used in tests. */
-public class FakeSnowflakeStreamingServiceImpl
- implements SnowflakeService<SnowflakeStreamingServiceConfig> {
+/** Fake implementation of {@link SnowflakeServices.StreamingService} used in tests. */
+public class FakeSnowflakeStreamingServiceImpl implements SnowflakeServices.StreamingService {
private FakeSnowflakeIngestManager snowflakeIngestManager;
@Override
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/TestUtils.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/TestUtils.java
index 8807a972eaf..61853cdb452 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/TestUtils.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/TestUtils.java
@@ -48,9 +48,6 @@ import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@SuppressWarnings({
- "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-})
public class TestUtils {
private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class);
@@ -112,40 +109,37 @@ public class TestUtils {
}
public static SnowflakeIO.UserDataMapper<Long> getCsvMapper() {
- return (SnowflakeIO.UserDataMapper<Long>) recordLine -> new String[] {recordLine.toString()};
+ return recordLine -> new String[] {recordLine.toString()};
}
public static SnowflakeIO.UserDataMapper<KV<String, Long>> getLongCsvMapperKV() {
- return (SnowflakeIO.UserDataMapper<KV<String, Long>>)
- recordLine -> new Long[] {recordLine.getValue()};
+ return recordLine -> new Long[] {recordLine.getValue()};
}
public static SnowflakeIO.UserDataMapper<Long> getLongCsvMapper() {
- return (SnowflakeIO.UserDataMapper<Long>) recordLine -> new Long[] {recordLine};
+ return recordLine -> new Long[] {recordLine};
}
public static SnowflakeIO.CsvMapper<TestRow> getTestRowCsvMapper() {
- return (SnowflakeIO.CsvMapper<TestRow>)
- parts -> TestRow.create(Integer.valueOf(parts[0]), parts[1]);
+ return parts -> TestRow.create(Integer.valueOf(parts[0]), parts[1]);
}
public static SnowflakeIO.UserDataMapper<TestRow> getTestRowDataMapper() {
- return (SnowflakeIO.UserDataMapper<TestRow>)
- (TestRow element) -> new Object[] {element.id(), element.name()};
+ return (TestRow element) -> new Object[] {element.id(), element.name()};
}
public static SnowflakeIO.UserDataMapper<String[]> getLStringCsvMapper() {
- return (SnowflakeIO.UserDataMapper<String[]>) recordLine -> recordLine;
+ return recordLine -> recordLine;
}
public static SnowflakeIO.UserDataMapper<String> getStringCsvMapper() {
- return (SnowflakeIO.UserDataMapper<String>) recordLine -> new String[] {recordLine};
+ return recordLine -> new String[] {recordLine};
}
public static class ParseToKv extends DoFn<Long, KV<String, Long>> {
@ProcessElement
public void processElement(ProcessContext c) {
- KV stringIntKV = KV.of(c.element().toString(), c.element().longValue());
+ KV<String, Long> stringIntKV = KV.of(c.element().toString(), c.element().longValue());
c.output(stringIntKV);
}
}
@@ -167,24 +161,24 @@ public class TestUtils {
return lines;
}
- public static String getValidEncryptedPrivateKeyPath(Class c) {
+ public static String getValidEncryptedPrivateKeyPath(Class<?> c) {
return getPrivateKeyPath(c, VALID_ENCRYPTED_PRIVATE_KEY_FILE_NAME);
}
- public static String getValidUnencryptedPrivateKeyPath(Class c) {
+ public static String getValidUnencryptedPrivateKeyPath(Class<?> c) {
return getPrivateKeyPath(c, VALID_UNENCRYPTED_PRIVATE_KEY_FILE_NAME);
}
- public static String getInvalidPrivateKeyPath(Class c) {
+ public static String getInvalidPrivateKeyPath(Class<?> c) {
return getPrivateKeyPath(c, INVALID_PRIVATE_KEY_FILE_NAME);
}
- public static String getRawValidEncryptedPrivateKey(Class c) throws IOException {
+ public static String getRawValidEncryptedPrivateKey(Class<?> c) throws IOException {
byte[] keyBytes = Files.readAllBytes(Paths.get(getValidEncryptedPrivateKeyPath(c)));
return new String(keyBytes, StandardCharsets.UTF_8);
}
- public static String getRawValidUnencryptedPrivateKey(Class c) throws IOException {
+ public static String getRawValidUnencryptedPrivateKey(Class<?> c) throws IOException {
byte[] keyBytes = Files.readAllBytes(Paths.get(getValidUnencryptedPrivateKeyPath(c)));
return new String(keyBytes, StandardCharsets.UTF_8);
}
@@ -193,7 +187,7 @@ public class TestUtils {
return PRIVATE_KEY_PASSPHRASE;
}
- private static String getPrivateKeyPath(Class c, String path) {
+ private static String getPrivateKeyPath(Class<?> c, String path) {
ClassLoader classLoader = c.getClassLoader();
File file = new File(classLoader.getResource(path).getFile());
return file.getAbsolutePath();
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/data/SnowflakeDataTypeValidTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/data/SnowflakeDataTypeValidTest.java
index e42a6c712a3..f72c16e270a 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/data/SnowflakeDataTypeValidTest.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/data/SnowflakeDataTypeValidTest.java
@@ -51,9 +51,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
-@SuppressWarnings({
- "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-})
public class SnowflakeDataTypeValidTest {
private SnowflakeDataType snowflakeDataType;
private String expectedResult;
@@ -64,7 +61,7 @@ public class SnowflakeDataTypeValidTest {
}
@Parameterized.Parameters
- public static Collection primeNumbers() {
+ public static Collection<Object[]> primeNumbers() {
return Arrays.asList(
new Object[][] {
{SnowflakeBoolean.of(), "BOOLEAN"},
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/read/SnowflakeIOReadTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/read/SnowflakeIOReadTest.java
index 024ef47aac2..10403fda285 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/read/SnowflakeIOReadTest.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/read/SnowflakeIOReadTest.java
@@ -26,10 +26,10 @@ import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.AvroGeneratedUser;
import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBasicDataSource;
-import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBatchServiceImpl;
import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeDatabase;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeServicesImpl;
import org.apache.beam.sdk.io.snowflake.test.TestSnowflakePipelineOptions;
import org.apache.beam.sdk.io.snowflake.test.TestUtils;
import org.apache.beam.sdk.testing.PAssert;
@@ -45,9 +45,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
-@SuppressWarnings({
- "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-})
public class SnowflakeIOReadTest implements Serializable {
public static final String FAKE_TABLE = "FAKE_TABLE";
public static final String FAKE_QUERY = "SELECT * FROM FAKE_TABLE";
@@ -60,7 +57,7 @@ public class SnowflakeIOReadTest implements Serializable {
@Rule public transient ExpectedException thrown = ExpectedException.none();
private static SnowflakeIO.DataSourceConfiguration dataSourceConfiguration;
- private static SnowflakeService snowflakeService;
+ private static SnowflakeServices snowflakeServices;
private static List<GenericRecord> avroTestData;
@BeforeClass
@@ -83,7 +80,7 @@ public class SnowflakeIOReadTest implements Serializable {
SnowflakeIO.DataSourceConfiguration.create(new FakeSnowflakeBasicDataSource())
.withServerName(options.getServerName());
- snowflakeService = new FakeSnowflakeBatchServiceImpl();
+ snowflakeServices = new FakeSnowflakeServicesImpl();
}
@AfterClass
@@ -97,7 +94,7 @@ public class SnowflakeIOReadTest implements Serializable {
thrown.expectMessage("withStagingBucketName() is required");
pipeline.apply(
- SnowflakeIO.<GenericRecord>read(snowflakeService)
+ SnowflakeIO.<GenericRecord>read(snowflakeServices)
.withDataSourceConfiguration(dataSourceConfiguration)
.fromTable(FAKE_TABLE)
.withStorageIntegrationName(options.getStorageIntegrationName())
@@ -113,7 +110,7 @@ public class SnowflakeIOReadTest implements Serializable {
thrown.expectMessage("withStorageIntegrationName() is required");
pipeline.apply(
- SnowflakeIO.<GenericRecord>read(snowflakeService)
+ SnowflakeIO.<GenericRecord>read(snowflakeServices)
.withDataSourceConfiguration(dataSourceConfiguration)
.fromTable(FAKE_TABLE)
.withStagingBucketName(options.getStagingBucketName())
@@ -129,7 +126,7 @@ public class SnowflakeIOReadTest implements Serializable {
thrown.expectMessage("withCsvMapper() is required");
pipeline.apply(
- SnowflakeIO.<GenericRecord>read(snowflakeService)
+ SnowflakeIO.<GenericRecord>read(snowflakeServices)
.withDataSourceConfiguration(dataSourceConfiguration)
.fromTable(FAKE_TABLE)
.withStagingBucketName(options.getStagingBucketName())
@@ -145,7 +142,7 @@ public class SnowflakeIOReadTest implements Serializable {
thrown.expectMessage("withCoder() is required");
pipeline.apply(
- SnowflakeIO.<GenericRecord>read(snowflakeService)
+ SnowflakeIO.<GenericRecord>read(snowflakeServices)
.withDataSourceConfiguration(dataSourceConfiguration)
.fromTable(FAKE_TABLE)
.withStagingBucketName(options.getStagingBucketName())
@@ -161,7 +158,7 @@ public class SnowflakeIOReadTest implements Serializable {
thrown.expectMessage("fromTable() or fromQuery() is required");
pipeline.apply(
- SnowflakeIO.<GenericRecord>read(snowflakeService)
+ SnowflakeIO.<GenericRecord>read(snowflakeServices)
.withDataSourceConfiguration(dataSourceConfiguration)
.withStagingBucketName(options.getStagingBucketName())
.withStorageIntegrationName(options.getStorageIntegrationName())
@@ -177,7 +174,7 @@ public class SnowflakeIOReadTest implements Serializable {
thrown.expectMessage("withDataSourceConfiguration() or withDataSourceProviderFn() is required");
pipeline.apply(
- SnowflakeIO.<GenericRecord>read(snowflakeService)
+ SnowflakeIO.<GenericRecord>read(snowflakeServices)
.fromTable(FAKE_TABLE)
.withStagingBucketName(options.getStagingBucketName())
.withStorageIntegrationName(options.getStorageIntegrationName())
@@ -193,7 +190,7 @@ public class SnowflakeIOReadTest implements Serializable {
thrown.expectMessage("fromTable() and fromQuery() are not allowed together");
pipeline.apply(
- SnowflakeIO.<GenericRecord>read(snowflakeService)
+ SnowflakeIO.<GenericRecord>read(snowflakeServices)
.withDataSourceConfiguration(dataSourceConfiguration)
.fromQuery("")
.fromTable(FAKE_TABLE)
@@ -211,7 +208,7 @@ public class SnowflakeIOReadTest implements Serializable {
thrown.expectMessage("SQL compilation error: Table does not exist");
pipeline.apply(
- SnowflakeIO.<GenericRecord>read(snowflakeService)
+ SnowflakeIO.<GenericRecord>read(snowflakeServices)
.withDataSourceConfiguration(dataSourceConfiguration)
.fromTable("NON_EXIST")
.withStagingBucketName(options.getStagingBucketName())
@@ -228,7 +225,7 @@ public class SnowflakeIOReadTest implements Serializable {
thrown.expectMessage("SQL compilation error: Invalid query");
pipeline.apply(
- SnowflakeIO.<GenericRecord>read(snowflakeService)
+ SnowflakeIO.<GenericRecord>read(snowflakeServices)
.withDataSourceConfiguration(dataSourceConfiguration)
.fromQuery("BAD_QUERY")
.withStagingBucketName(options.getStagingBucketName())
@@ -243,7 +240,7 @@ public class SnowflakeIOReadTest implements Serializable {
public void testReadFromTable() {
PCollection<GenericRecord> items =
pipeline.apply(
- SnowflakeIO.<GenericRecord>read(snowflakeService)
+ SnowflakeIO.<GenericRecord>read(snowflakeServices)
.withDataSourceConfiguration(dataSourceConfiguration)
.fromTable(FAKE_TABLE)
.withStagingBucketName(options.getStagingBucketName())
@@ -259,7 +256,7 @@ public class SnowflakeIOReadTest implements Serializable {
public void testReadFromQuery() {
PCollection<GenericRecord> items =
pipeline.apply(
- SnowflakeIO.<GenericRecord>read(snowflakeService)
+ SnowflakeIO.<GenericRecord>read(snowflakeServices)
.withDataSourceConfiguration(dataSourceConfiguration)
.fromQuery(FAKE_QUERY)
.withStagingBucketName(options.getStagingBucketName())
@@ -272,12 +269,11 @@ public class SnowflakeIOReadTest implements Serializable {
}
static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
- return (SnowflakeIO.CsvMapper<GenericRecord>)
- parts ->
- new GenericRecordBuilder(AvroGeneratedUser.getClassSchema())
- .set("name", String.valueOf(parts[0]))
- .set("favorite_number", Integer.valueOf(parts[1]))
- .set("favorite_color", String.valueOf(parts[2]))
- .build();
+ return parts ->
+ new GenericRecordBuilder(AvroGeneratedUser.getClassSchema())
+ .set("name", String.valueOf(parts[0]))
+ .set("favorite_number", Integer.valueOf(parts[1]))
+ .set("favorite_color", String.valueOf(parts[2]))
+ .build();
}
}
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/CreateDispositionTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/CreateDispositionTest.java
index f91fe31f5fe..f279c086089 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/CreateDispositionTest.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/CreateDispositionTest.java
@@ -29,10 +29,10 @@ import org.apache.beam.sdk.io.snowflake.data.SnowflakeColumn;
import org.apache.beam.sdk.io.snowflake.data.SnowflakeTableSchema;
import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeVarchar;
import org.apache.beam.sdk.io.snowflake.enums.CreateDisposition;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBasicDataSource;
-import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBatchServiceImpl;
import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeDatabase;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeServicesImpl;
import org.apache.beam.sdk.io.snowflake.test.TestSnowflakePipelineOptions;
import org.apache.beam.sdk.io.snowflake.test.TestUtils;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -48,9 +48,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
-@SuppressWarnings({
- "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-})
public class CreateDispositionTest {
private static final String FAKE_TABLE = "FAKE_TABLE";
private static final String BUCKET_NAME = "bucket/";
@@ -63,7 +60,7 @@ public class CreateDispositionTest {
private static String stagingBucketName;
private static String storageIntegrationName;
- private static SnowflakeService snowflakeService;
+ private static SnowflakeServices snowflakeServices;
private static List<Long> testData;
@BeforeClass
@@ -76,7 +73,7 @@ public class CreateDispositionTest {
stagingBucketName = options.getStagingBucketName();
storageIntegrationName = options.getStorageIntegrationName();
- snowflakeService = new FakeSnowflakeBatchServiceImpl();
+ snowflakeServices = new FakeSnowflakeServicesImpl();
testData = LongStream.range(0, 100).boxed().collect(Collectors.toList());
dc =
@@ -109,7 +106,7 @@ public class CreateDispositionTest {
.withUserDataMapper(TestUtils.getLongCsvMapper())
.withFileNameTemplate("output")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();
@@ -137,7 +134,7 @@ public class CreateDispositionTest {
.withFileNameTemplate("output")
.withUserDataMapper(getCsvMapper())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();
}
@@ -161,7 +158,7 @@ public class CreateDispositionTest {
.withFileNameTemplate("output")
.withUserDataMapper(TestUtils.getLongCsvMapper())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();
List<Long> actualData = FakeSnowflakeDatabase.getElementsAsLong("NO_EXIST_TABLE");
@@ -185,7 +182,7 @@ public class CreateDispositionTest {
.withFileNameTemplate("output")
.withUserDataMapper(TestUtils.getLongCsvMapper())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();
List<Long> actualData = FakeSnowflakeDatabase.getElementsAsLong(FAKE_TABLE);
@@ -211,7 +208,7 @@ public class CreateDispositionTest {
.withFileNameTemplate("output")
.withUserDataMapper(TestUtils.getLongCsvMapper())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();
}
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/QueryDispositionLocationTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/QueryDispositionLocationTest.java
index 1aa1ad0b1b9..47f3bfeaa8a 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/QueryDispositionLocationTest.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/QueryDispositionLocationTest.java
@@ -26,10 +26,10 @@ import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
import org.apache.beam.sdk.io.snowflake.enums.WriteDisposition;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBasicDataSource;
-import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBatchServiceImpl;
import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeDatabase;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeServicesImpl;
import org.apache.beam.sdk.io.snowflake.test.TestSnowflakePipelineOptions;
import org.apache.beam.sdk.io.snowflake.test.TestUtils;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -45,9 +45,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
-@SuppressWarnings({
- "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-})
public class QueryDispositionLocationTest {
private static final String FAKE_TABLE = "FAKE_TABLE";
private static final String BUCKET_NAME = "BUCKET/";
@@ -58,7 +55,7 @@ public class QueryDispositionLocationTest {
private static TestSnowflakePipelineOptions options;
private static SnowflakeIO.DataSourceConfiguration dc;
- private static SnowflakeService snowflakeService;
+ private static SnowflakeServices snowflakeServices;
private static List<Long> testData;
@BeforeClass
@@ -66,7 +63,7 @@ public class QueryDispositionLocationTest {
PipelineOptionsFactory.register(TestSnowflakePipelineOptions.class);
options = TestPipeline.testingPipelineOptions().as(TestSnowflakePipelineOptions.class);
- snowflakeService = new FakeSnowflakeBatchServiceImpl();
+ snowflakeServices = new FakeSnowflakeServicesImpl();
testData = LongStream.range(0, 100).boxed().collect(Collectors.toList());
}
@@ -102,7 +99,7 @@ public class QueryDispositionLocationTest {
.withUserDataMapper(TestUtils.getLongCsvMapper())
.withFileNameTemplate("output")
.withWriteDisposition(WriteDisposition.TRUNCATE)
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();
@@ -131,7 +128,7 @@ public class QueryDispositionLocationTest {
.withUserDataMapper(TestUtils.getLongCsvMapper())
.withFileNameTemplate("output")
.withWriteDisposition(WriteDisposition.EMPTY)
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();
}
@@ -152,7 +149,7 @@ public class QueryDispositionLocationTest {
.withFileNameTemplate("output")
.withUserDataMapper(TestUtils.getLongCsvMapper())
.withWriteDisposition(WriteDisposition.EMPTY)
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SchemaDispositionTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SchemaDispositionTest.java
index 89edc91c411..4073ac8b59e 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SchemaDispositionTest.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SchemaDispositionTest.java
@@ -35,10 +35,10 @@ import org.apache.beam.sdk.io.snowflake.data.structured.SnowflakeObject;
import org.apache.beam.sdk.io.snowflake.data.structured.SnowflakeVariant;
import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeText;
import org.apache.beam.sdk.io.snowflake.enums.CreateDisposition;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBasicDataSource;
-import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBatchServiceImpl;
import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeDatabase;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeServicesImpl;
import org.apache.beam.sdk.io.snowflake.test.TestSnowflakePipelineOptions;
import org.apache.beam.sdk.io.snowflake.test.TestUtils;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -54,9 +54,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
-@SuppressWarnings({
- "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-})
public class SchemaDispositionTest {
private static final String BUCKET_NAME = "BUCKET/";
@@ -68,7 +65,7 @@ public class SchemaDispositionTest {
private static String stagingBucketName;
private static String storageIntegrationName;
- private static SnowflakeService snowflakeService;
+ private static SnowflakeServices snowflakeServices;
@BeforeClass
public static void setupAll() {
@@ -80,7 +77,7 @@ public class SchemaDispositionTest {
stagingBucketName = options.getStagingBucketName();
storageIntegrationName = options.getStorageIntegrationName();
- snowflakeService = new FakeSnowflakeBatchServiceImpl();
+ snowflakeServices = new FakeSnowflakeServicesImpl();
dc =
SnowflakeIO.DataSourceConfiguration.create(new FakeSnowflakeBasicDataSource())
@@ -130,7 +127,7 @@ public class SchemaDispositionTest {
.withFileNameTemplate("output")
.withUserDataMapper(TestUtils.getLStringCsvMapper())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();
@@ -168,7 +165,7 @@ public class SchemaDispositionTest {
.withFileNameTemplate("output")
.withUserDataMapper(getCsvMapper())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();
@@ -208,7 +205,7 @@ public class SchemaDispositionTest {
.withFileNameTemplate("output")
.withUserDataMapper(getCsvMapper())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SnowflakeIOWriteTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SnowflakeIOWriteTest.java
index 185015169d9..f5784284098 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SnowflakeIOWriteTest.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SnowflakeIOWriteTest.java
@@ -26,10 +26,10 @@ import java.util.stream.Collectors;
import java.util.stream.LongStream;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBasicDataSource;
-import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBatchServiceImpl;
import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeDatabase;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeServicesImpl;
import org.apache.beam.sdk.io.snowflake.test.TestSnowflakePipelineOptions;
import org.apache.beam.sdk.io.snowflake.test.TestUtils;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -47,9 +47,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
-@SuppressWarnings({
- "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-})
public class SnowflakeIOWriteTest {
private static final String FAKE_TABLE = "FAKE_TABLE";
private static final String BUCKET_NAME = "BUCKET/";
@@ -61,13 +58,13 @@ public class SnowflakeIOWriteTest {
private static TestSnowflakePipelineOptions options;
private static SnowflakeIO.DataSourceConfiguration dc;
- private static SnowflakeService snowflakeService;
+ private static SnowflakeServices snowflakeServices;
private static List<Long> testData;
private static List<String> testDataInStrings;
@BeforeClass
public static void setupAll() {
- snowflakeService = new FakeSnowflakeBatchServiceImpl();
+ snowflakeServices = new FakeSnowflakeServicesImpl();
testData = LongStream.range(0, 100).boxed().collect(Collectors.toList());
testDataInStrings = new ArrayList<>();
@@ -112,7 +109,7 @@ public class SnowflakeIOWriteTest {
.to(FAKE_TABLE)
.withStagingBucketName(options.getStagingBucketName())
.withStorageIntegrationName(options.getStorageIntegrationName())
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();
@@ -133,7 +130,7 @@ public class SnowflakeIOWriteTest {
.withStorageIntegrationName(options.getStorageIntegrationName())
.withDataSourceConfiguration(dc)
.withUserDataMapper(TestUtils.getLongCsvMapper())
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();
@@ -155,7 +152,7 @@ public class SnowflakeIOWriteTest {
.to(FAKE_TABLE)
.withStagingBucketName(options.getStagingBucketName())
.withStorageIntegrationName(options.getStorageIntegrationName())
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();
@@ -180,7 +177,7 @@ public class SnowflakeIOWriteTest {
.withUserDataMapper(TestUtils.getLongCsvMapperKV())
.withDataSourceConfiguration(dc)
.withQueryTransformation(query)
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();
@@ -202,7 +199,7 @@ public class SnowflakeIOWriteTest {
.to(FAKE_TABLE)
.withStagingBucketName(options.getStagingBucketName())
.withStorageIntegrationName(options.getStorageIntegrationName())
- .withSnowflakeService(snowflakeService)
+ .withSnowflakeServices(snowflakeServices)
.withQuotationMark("\""));
pipeline.run(options).waitUntilFinish();
@@ -228,7 +225,7 @@ public class SnowflakeIOWriteTest {
.to(FAKE_TABLE)
.withStagingBucketName(options.getStagingBucketName())
.withStorageIntegrationName(options.getStorageIntegrationName())
- .withSnowflakeService(snowflakeService)
+ .withSnowflakeServices(snowflakeServices)
.withQuotationMark(""));
pipeline.run(options).waitUntilFinish();
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java
index a7f2cbe17e9..780aa26356a 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java
@@ -33,10 +33,10 @@ import java.util.stream.LongStream;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBasicDataSource;
import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeDatabase;
-import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeStreamingServiceImpl;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeServicesImpl;
import org.apache.beam.sdk.io.snowflake.test.TestSnowflakePipelineOptions;
import org.apache.beam.sdk.io.snowflake.test.TestUtils;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -58,9 +58,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
-@SuppressWarnings({
- "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-})
public class StreamingWriteTest {
private static final String FAKE_TABLE = "TEST_TABLE";
@@ -73,7 +70,7 @@ public class StreamingWriteTest {
@Rule public ExpectedException exceptionRule = ExpectedException.none();
private static SnowflakeIO.DataSourceConfiguration dataSourceConfiguration;
- private static SnowflakeService snowflakeService;
+ private static SnowflakeServices snowflakeServices;
private static TestSnowflakePipelineOptions options;
private static List<Long> testData;
@@ -92,7 +89,7 @@ public class StreamingWriteTest {
@BeforeClass
public static void setup() {
- snowflakeService = new FakeSnowflakeStreamingServiceImpl();
+ snowflakeServices = new FakeSnowflakeServicesImpl();
PipelineOptionsFactory.register(TestSnowflakePipelineOptions.class);
options = TestPipeline.testingPipelineOptions().as(TestSnowflakePipelineOptions.class);
@@ -138,7 +135,7 @@ public class StreamingWriteTest {
.withStorageIntegrationName(STORAGE_INTEGRATION_NAME)
.withSnowPipe(SNOW_PIPE)
.withUserDataMapper(TestUtils.getLongCsvMapper())
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options);
}
@@ -165,7 +162,7 @@ public class StreamingWriteTest {
.withStorageIntegrationName(STORAGE_INTEGRATION_NAME)
.withSnowPipe(SNOW_PIPE)
.withUserDataMapper(TestUtils.getLongCsvMapper())
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options);
}
@@ -211,7 +208,7 @@ public class StreamingWriteTest {
.withFlushRowLimit(4)
.withFlushTimeLimit(WINDOW_DURATION)
.withUserDataMapper(TestUtils.getStringCsvMapper())
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();
@@ -271,7 +268,7 @@ public class StreamingWriteTest {
.withQuotationMark(quotationMark)
.withFlushTimeLimit(WINDOW_DURATION)
.withUserDataMapper(TestUtils.getStringCsvMapper())
- .withSnowflakeService(snowflakeService));
+ .withSnowflakeServices(snowflakeServices));
pipeline.run(options).waitUntilFinish();