You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2022/04/06 23:15:23 UTC

[beam] branch master updated: [BEAM-11936] Fix rawtypes warnings in SnowflakeIO (#17257)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a2391149eae [BEAM-11936] Fix rawtypes warnings in SnowflakeIO (#17257)
a2391149eae is described below

commit a2391149eaed5e3e7d86fd4f1f146ab9d6848e30
Author: Kamil BreguĊ‚a <ka...@snowflake.com>
AuthorDate: Thu Apr 7 01:15:15 2022 +0200

    [BEAM-11936] Fix rawtypes warnings in SnowflakeIO (#17257)
    
    * [BEAM-10556] Fix rawtypes warnings in SnowflakeIO
    
    * fixup! [BEAM-10556] Fix rawtypes warnings in SnowflakeIO
---
 .../apache/beam/sdk/io/snowflake/SnowflakeIO.java  | 167 ++++++++++-----------
 .../crosslanguage/SnowflakeTransformRegistrar.java |   1 +
 .../io/snowflake/crosslanguage/WriteBuilder.java   |  10 +-
 .../io/snowflake/data/SnowflakeTableSchema.java    |   3 +-
 .../services/SnowflakeBatchServiceConfig.java      |   2 +-
 .../services/SnowflakeBatchServiceImpl.java        |  15 +-
 ...nowflakeService.java => SnowflakeServices.java} |  18 ++-
 ...lakeService.java => SnowflakeServicesImpl.java} |  15 +-
 .../services/SnowflakeStreamingServiceConfig.java  |   2 +-
 .../services/SnowflakeStreamingServiceImpl.java    |   5 +-
 .../test/FakeSnowflakeBatchServiceImpl.java        |   5 +-
 .../snowflake/test/FakeSnowflakeServicesImpl.java} |  20 ++-
 .../test/FakeSnowflakeStreamingServiceImpl.java    |   7 +-
 .../beam/sdk/io/snowflake/test/TestUtils.java      |  34 ++---
 .../test/unit/data/SnowflakeDataTypeValidTest.java |   5 +-
 .../test/unit/read/SnowflakeIOReadTest.java        |  46 +++---
 .../test/unit/write/CreateDispositionTest.java     |  21 ++-
 .../unit/write/QueryDispositionLocationTest.java   |  17 +--
 .../test/unit/write/SchemaDispositionTest.java     |  17 +--
 .../test/unit/write/SnowflakeIOWriteTest.java      |  23 ++-
 .../test/unit/write/StreamingWriteTest.java        |  19 +--
 21 files changed, 217 insertions(+), 235 deletions(-)

diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
index 90bc9ee188b..44180bcd534 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
@@ -56,9 +56,9 @@ import org.apache.beam.sdk.io.snowflake.enums.StreamingLogLevel;
 import org.apache.beam.sdk.io.snowflake.enums.WriteDisposition;
 import org.apache.beam.sdk.io.snowflake.services.SnowflakeBatchServiceConfig;
 import org.apache.beam.sdk.io.snowflake.services.SnowflakeBatchServiceImpl;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServicesImpl;
 import org.apache.beam.sdk.io.snowflake.services.SnowflakeStreamingServiceConfig;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeStreamingServiceImpl;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
@@ -83,8 +83,8 @@ import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
 import org.joda.time.Duration;
@@ -176,7 +176,6 @@ import org.slf4j.LoggerFactory;
  */
 @Experimental
 @SuppressWarnings({
-  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 public class SnowflakeIO {
@@ -192,14 +191,15 @@ public class SnowflakeIO {
   static final Duration DEFAULT_SLEEP_STREAMING_LOGS = Duration.standardSeconds(5000);
 
   /**
-   * Read data from Snowflake via COPY statement using user-defined {@link SnowflakeService}.
+   * Read data from Snowflake via COPY statement using user-defined {@link SnowflakeServices}.
    *
-   * @param snowflakeService user-defined {@link SnowflakeService}
+   * @param snowflakeServices user-defined {@link SnowflakeServices}
    * @param <T> Type of the data to be read.
    */
-  public static <T> Read<T> read(SnowflakeService snowflakeService) {
+  @VisibleForTesting
+  public static <T> Read<T> read(SnowflakeServices snowflakeServices) {
     return new AutoValue_SnowflakeIO_Read.Builder<T>()
-        .setSnowflakeService(snowflakeService)
+        .setSnowflakeServices(snowflakeServices)
         .setQuotationMark(ValueProvider.StaticValueProvider.of(CSV_QUOTE_CHAR))
         .build();
   }
@@ -210,7 +210,7 @@ public class SnowflakeIO {
    * @param <T> Type of the data to be read.
    */
   public static <T> Read<T> read() {
-    return read(new SnowflakeBatchServiceImpl());
+    return read(new SnowflakeServicesImpl());
   }
 
   /**
@@ -255,7 +255,6 @@ public class SnowflakeIO {
   /** Implementation of {@link #read()}. */
   @AutoValue
   @AutoValue.CopyAnnotations
-  @SuppressWarnings({"rawtypes"})
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
 
     abstract @Nullable SerializableFunction<Void, DataSource> getDataSourceProviderFn();
@@ -276,7 +275,7 @@ public class SnowflakeIO {
 
     abstract @Nullable Coder<T> getCoder();
 
-    abstract @Nullable SnowflakeService getSnowflakeService();
+    abstract @Nullable SnowflakeServices getSnowflakeServices();
 
     @Nullable
     abstract ValueProvider<String> getQuotationMark();
@@ -300,7 +299,7 @@ public class SnowflakeIO {
 
       abstract Builder<T> setCoder(Coder<T> coder);
 
-      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+      abstract Builder<T> setSnowflakeServices(SnowflakeServices snowflakeServices);
 
       abstract Builder<T> setQuotationMark(ValueProvider<String> quotationMark);
 
@@ -439,7 +438,7 @@ public class SnowflakeIO {
                           getStorageIntegrationName(),
                           getStagingBucketName(),
                           tmpDirName,
-                          getSnowflakeService(),
+                          getSnowflakeServices(),
                           getQuotationMark())))
               .apply(Reshuffle.viaRandomKey())
               .apply(FileIO.matchAll())
@@ -493,7 +492,7 @@ public class SnowflakeIO {
       private final ValueProvider<String> storageIntegrationName;
       private final ValueProvider<String> stagingBucketDir;
       private final String tmpDirName;
-      private final SnowflakeService snowflakeService;
+      private final SnowflakeServices snowflakeServices;
       private final ValueProvider<String> quotationMark;
 
       private CopyIntoStageFn(
@@ -503,13 +502,13 @@ public class SnowflakeIO {
           ValueProvider<String> storageIntegrationName,
           ValueProvider<String> stagingBucketDir,
           String tmpDirName,
-          SnowflakeService snowflakeService,
+          SnowflakeServices snowflakeServices,
           ValueProvider<String> quotationMark) {
         this.dataSourceProviderFn = dataSourceProviderFn;
         this.query = query;
         this.table = table;
         this.storageIntegrationName = storageIntegrationName;
-        this.snowflakeService = snowflakeService;
+        this.snowflakeServices = snowflakeServices;
         this.quotationMark = quotationMark;
         this.stagingBucketDir = stagingBucketDir;
         this.tmpDirName = tmpDirName;
@@ -545,7 +544,7 @@ public class SnowflakeIO {
                 stagingBucketRunDir,
                 quotationMark.get());
 
-        String output = snowflakeService.read(config);
+        String output = snowflakeServices.getBatchService().read(config);
 
         context.output(output);
       }
@@ -634,7 +633,6 @@ public class SnowflakeIO {
   /** Implementation of {@link #write()}. */
   @AutoValue
   @AutoValue.CopyAnnotations
-  @SuppressWarnings({"rawtypes"})
   public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
 
     abstract @Nullable SerializableFunction<Void, DataSource> getDataSourceProviderFn();
@@ -673,13 +671,13 @@ public class SnowflakeIO {
     abstract CreateDisposition getCreateDisposition();
 
     @Nullable
-    abstract UserDataMapper getUserDataMapper();
+    abstract UserDataMapper<T> getUserDataMapper();
 
     @Nullable
     abstract SnowflakeTableSchema getTableSchema();
 
     @Nullable
-    abstract SnowflakeService getSnowflakeService();
+    abstract SnowflakeServices getSnowflakeServices();
 
     @Nullable
     abstract String getQuotationMark();
@@ -712,7 +710,7 @@ public class SnowflakeIO {
 
       abstract Builder<T> setFileNameTemplate(String fileNameTemplate);
 
-      abstract Builder<T> setUserDataMapper(UserDataMapper userDataMapper);
+      abstract Builder<T> setUserDataMapper(UserDataMapper<T> userDataMapper);
 
       abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
 
@@ -720,7 +718,7 @@ public class SnowflakeIO {
 
       abstract Builder<T> setTableSchema(SnowflakeTableSchema tableSchema);
 
-      abstract Builder<T> setSnowflakeService(SnowflakeService snowflakeService);
+      abstract Builder<T> setSnowflakeServices(SnowflakeServices snowflakeServices);
 
       abstract Builder<T> setQuotationMark(String quotationMark);
 
@@ -823,7 +821,7 @@ public class SnowflakeIO {
      *
      * @param userDataMapper an instance of {@link UserDataMapper}.
      */
-    public Write<T> withUserDataMapper(UserDataMapper userDataMapper) {
+    public Write<T> withUserDataMapper(UserDataMapper<T> userDataMapper) {
       return toBuilder().setUserDataMapper(userDataMapper).build();
     }
 
@@ -935,12 +933,13 @@ public class SnowflakeIO {
     }
 
     /**
-     * A snowflake service {@link SnowflakeService} implementation which is supposed to be used.
+     * A snowflake service {@link SnowflakeServices} implementation which is supposed to be used.
      *
-     * @param snowflakeService an instance of {@link SnowflakeService}.
+     * @param snowflakeServices an instance of {@link SnowflakeServices}.
      */
-    public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
-      return toBuilder().setSnowflakeService(snowflakeService).build();
+    @VisibleForTesting
+    public Write<T> withSnowflakeServices(SnowflakeServices snowflakeServices) {
+      return toBuilder().setSnowflakeServices(snowflakeServices).build();
     }
 
     /**
@@ -971,7 +970,7 @@ public class SnowflakeIO {
     public PDone expand(PCollection<T> input) {
       checkArguments(input);
 
-      PCollection out;
+      PCollection<Void> out;
 
       if (getSnowPipe() != null) {
         out = writeStream(input, getStagingBucketName());
@@ -979,8 +978,6 @@ public class SnowflakeIO {
         out = writeBatch(input, getStagingBucketName());
       }
 
-      out.setCoder(StringUtf8Coder.of());
-
       return PDone.in(out.getPipeline());
     }
 
@@ -1004,12 +1001,10 @@ public class SnowflakeIO {
       }
     }
 
-    private PCollection<T> writeStream(
+    private PCollection<Void> writeStream(
         PCollection<T> input, ValueProvider<String> stagingBucketDir) {
-      SnowflakeService snowflakeService =
-          getSnowflakeService() != null
-              ? getSnowflakeService()
-              : new SnowflakeStreamingServiceImpl();
+      SnowflakeServices snowflakeServices =
+          getSnowflakeServices() != null ? getSnowflakeServices() : new SnowflakeServicesImpl();
 
       /* Ensure that files will be created after specific record count or duration specified */
       PCollection<T> inputInGlobalWindow =
@@ -1025,50 +1020,44 @@ public class SnowflakeIO {
                   .discardingFiredPanes());
 
       int shards = (getShardsNumber() > 0) ? getShardsNumber() : DEFAULT_STREAMING_SHARDS_NUMBER;
-      PCollection files = writeFiles(inputInGlobalWindow, stagingBucketDir, shards);
+      PCollection<String> files = writeFiles(inputInGlobalWindow, stagingBucketDir, shards);
 
       /* Ensuring that files will be ingested after flush time */
       files =
-          (PCollection)
-              files.apply(
-                  "Apply User Trigger",
-                  Window.<T>into(new GlobalWindows())
-                      .triggering(
-                          Repeatedly.forever(
-                              AfterProcessingTime.pastFirstElementInPane()
-                                  .plusDelayOf(getFlushTimeLimit())))
-                      .discardingFiredPanes());
-      files =
-          (PCollection)
-              files.apply(
-                  "Create list of files for loading via SnowPipe",
-                  Combine.globally(new Concatenate()).withoutDefaults());
+          files.apply(
+              "Apply User Trigger",
+              Window.<String>into(new GlobalWindows())
+                  .triggering(
+                      Repeatedly.forever(
+                          AfterProcessingTime.pastFirstElementInPane()
+                              .plusDelayOf(getFlushTimeLimit())))
+                  .discardingFiredPanes());
+      PCollection<List<String>> filesConcatenated =
+          files.apply(
+              "Create list of files for loading via SnowPipe",
+              Combine.globally(new Concatenate()).withoutDefaults());
 
-      return (PCollection)
-          files.apply("Stream files to table", streamToTable(snowflakeService, stagingBucketDir));
+      return filesConcatenated.apply(
+          "Stream files to table", streamToTable(snowflakeServices, stagingBucketDir));
     }
 
-    private PCollection writeBatch(PCollection input, ValueProvider<String> stagingBucketDir) {
-      SnowflakeService snowflakeService =
-          getSnowflakeService() != null ? getSnowflakeService() : new SnowflakeBatchServiceImpl();
+    private PCollection<Void> writeBatch(
+        PCollection<T> input, ValueProvider<String> stagingBucketDir) {
+      SnowflakeServices snowflakeServices =
+          getSnowflakeServices() != null ? getSnowflakeServices() : new SnowflakeServicesImpl();
 
       PCollection<String> files = writeBatchFiles(input, stagingBucketDir);
 
       // Combining PCollection of files as a side input into one list of files
       ListCoder<String> coder = ListCoder.of(StringUtf8Coder.of());
-      files =
-          (PCollection)
-              files
-                  .getPipeline()
-                  .apply(
-                      Reify.viewInGlobalWindow(
-                          (PCollectionView) files.apply(View.asList()), coder));
+      PCollection<List<String>> reifiedFiles =
+          files.getPipeline().apply(Reify.viewInGlobalWindow(files.apply(View.asList()), coder));
 
-      return (PCollection)
-          files.apply("Copy files to table", copyToTable(snowflakeService, stagingBucketDir));
+      return reifiedFiles.apply(
+          "Copy files to table", copyToTable(snowflakeServices, stagingBucketDir));
     }
 
-    private PCollection writeBatchFiles(
+    private PCollection<String> writeBatchFiles(
         PCollection<T> input, ValueProvider<String> outputDirectory) {
       int shards = (getShardsNumber() > 0) ? getShardsNumber() : DEFAULT_BATCH_SHARDS_NUMBER;
       return writeFiles(input, outputDirectory, shards);
@@ -1092,7 +1081,7 @@ public class SnowflakeIO {
                   ParDo.of(new MapObjectsArrayToCsvFn(getQuotationMark())))
               .setCoder(StringUtf8Coder.of());
 
-      WriteFilesResult filesResult =
+      WriteFilesResult<Void> filesResult =
           mappedUserData.apply(
               "Write files to specified location",
               FileIO.<String>write()
@@ -1103,16 +1092,15 @@ public class SnowflakeIO {
                   .withNumShards(numShards)
                   .withCompression(Compression.GZIP));
 
-      return (PCollection)
-          filesResult
-              .getPerDestinationOutputFilenames()
-              .apply("Parse KV filenames to Strings", Values.<String>create());
+      return filesResult
+          .getPerDestinationOutputFilenames()
+          .apply("Parse KV filenames to Strings", Values.<String>create());
     }
 
-    private ParDo.SingleOutput<Object, Object> copyToTable(
-        SnowflakeService snowflakeService, ValueProvider<String> stagingBucketDir) {
+    private ParDo.SingleOutput<List<String>, Void> copyToTable(
+        SnowflakeServices snowflakeServices, ValueProvider<String> stagingBucketDir) {
       return ParDo.of(
-          new CopyToTableFn<>(
+          new CopyToTableFn(
               getDataSourceProviderFn(),
               getTable(),
               getQuery(),
@@ -1121,19 +1109,19 @@ public class SnowflakeIO {
               getCreateDisposition(),
               getWriteDisposition(),
               getTableSchema(),
-              snowflakeService,
+              snowflakeServices,
               getQuotationMark()));
     }
 
-    protected PTransform streamToTable(
-        SnowflakeService snowflakeService, ValueProvider<String> stagingBucketDir) {
+    protected ParDo.SingleOutput<List<String>, Void> streamToTable(
+        SnowflakeServices snowflakeServices, ValueProvider<String> stagingBucketDir) {
       return ParDo.of(
           new StreamToTableFn(
               getDataSourceProviderFn(),
               getSnowPipe(),
               stagingBucketDir,
               getDebugMode(),
-              snowflakeService));
+              snowflakeServices));
     }
   }
 
@@ -1206,7 +1194,7 @@ public class SnowflakeIO {
     }
   }
 
-  private static class CopyToTableFn<ParameterT, OutputT> extends DoFn<ParameterT, OutputT> {
+  private static class CopyToTableFn extends DoFn<List<String>, Void> {
     private final SerializableFunction<Void, DataSource> dataSourceProviderFn;
     private final ValueProvider<String> table;
     private final ValueProvider<String> database;
@@ -1218,7 +1206,7 @@ public class SnowflakeIO {
     private final ValueProvider<String> storageIntegrationName;
     private final WriteDisposition writeDisposition;
     private final CreateDisposition createDisposition;
-    private final SnowflakeService snowflakeService;
+    private final SnowflakeServices snowflakeServices;
 
     CopyToTableFn(
         SerializableFunction<Void, DataSource> dataSourceProviderFn,
@@ -1229,7 +1217,7 @@ public class SnowflakeIO {
         CreateDisposition createDisposition,
         WriteDisposition writeDisposition,
         SnowflakeTableSchema tableSchema,
-        SnowflakeService snowflakeService,
+        SnowflakeServices snowflakeServices,
         String quotationMark) {
       this.dataSourceProviderFn = dataSourceProviderFn;
       this.query = query;
@@ -1239,7 +1227,7 @@ public class SnowflakeIO {
       this.storageIntegrationName = storageIntegrationName;
       this.writeDisposition = writeDisposition;
       this.createDisposition = createDisposition;
-      this.snowflakeService = snowflakeService;
+      this.snowflakeServices = snowflakeServices;
       this.quotationMark = quotationMark;
 
       DataSourceProviderFromDataSourceConfiguration dataSourceProviderFromDataSourceConfiguration =
@@ -1260,7 +1248,7 @@ public class SnowflakeIO {
       SnowflakeBatchServiceConfig config =
           new SnowflakeBatchServiceConfig(
               dataSourceProviderFn,
-              (List<String>) context.element(),
+              context.element(),
               tableSchema,
               databaseValue,
               schemaValue,
@@ -1271,17 +1259,17 @@ public class SnowflakeIO {
               storageIntegrationName.get(),
               stagingBucketDir.get(),
               quotationMark);
-      snowflakeService.write(config);
+      snowflakeServices.getBatchService().write(config);
     }
   }
 
   /** Custom DoFn that streams data to Snowflake table. */
-  private static class StreamToTableFn<ParameterT, OutputT> extends DoFn<ParameterT, OutputT> {
+  private static class StreamToTableFn extends DoFn<List<String>, Void> {
     private final SerializableFunction<Void, DataSource> dataSourceProviderFn;
     private final ValueProvider<String> stagingBucketDir;
     private final ValueProvider<String> snowPipe;
     private final StreamingLogLevel debugMode;
-    private final SnowflakeService snowflakeService;
+    private final SnowflakeServices snowflakeServices;
     private transient SimpleIngestManager ingestManager;
 
     ArrayList<String> trackedFilesNames;
@@ -1291,12 +1279,12 @@ public class SnowflakeIO {
         ValueProvider<String> snowPipe,
         ValueProvider<String> stagingBucketDir,
         StreamingLogLevel debugMode,
-        SnowflakeService snowflakeService) {
+        SnowflakeServices snowflakeServices) {
       this.dataSourceProviderFn = dataSourceProviderFn;
       this.stagingBucketDir = stagingBucketDir;
       this.snowPipe = snowPipe;
       this.debugMode = debugMode;
-      this.snowflakeService = snowflakeService;
+      this.snowflakeServices = snowflakeServices;
       trackedFilesNames = new ArrayList<>();
     }
 
@@ -1335,7 +1323,7 @@ public class SnowflakeIO {
 
     @ProcessElement
     public void processElement(ProcessContext context) throws Exception {
-      List<String> filesList = (List<String>) context.element();
+      List<String> filesList = context.element();
 
       if (debugMode != null) {
         trackedFilesNames.addAll(filesList);
@@ -1343,7 +1331,7 @@ public class SnowflakeIO {
       SnowflakeStreamingServiceConfig config =
           new SnowflakeStreamingServiceConfig(
               filesList, this.stagingBucketDir.get(), this.ingestManager);
-      snowflakeService.write(config);
+      snowflakeServices.getStreamingService().write(config);
     }
 
     @FinishBundle
@@ -1395,7 +1383,6 @@ public class SnowflakeIO {
    */
   @AutoValue
   @AutoValue.CopyAnnotations
-  @SuppressWarnings({"rawtypes"})
   public abstract static class DataSourceConfiguration implements Serializable {
     @Nullable
     public abstract String getUrl();
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java
index adc19201c8b..76bfb575976 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
 import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/WriteBuilder.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/WriteBuilder.java
index 89233f0937d..ba21f414878 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/WriteBuilder.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/WriteBuilder.java
@@ -36,7 +36,8 @@ import org.apache.beam.sdk.values.PDone;
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 public class WriteBuilder
-    implements ExternalTransformBuilder<WriteBuilder.Configuration, PCollection<byte[]>, PDone> {
+    implements ExternalTransformBuilder<
+        WriteBuilder.Configuration, PCollection<List<byte[]>>, PDone> {
 
   /** Parameters class to expose the transform to an external SDK. */
   public static class Configuration extends CrossLanguageConfiguration {
@@ -76,8 +77,8 @@ public class WriteBuilder
   }
 
   @Override
-  public PTransform<PCollection<byte[]>, PDone> buildExternal(Configuration c) {
-    return SnowflakeIO.<byte[]>write()
+  public PTransform<PCollection<List<byte[]>>, PDone> buildExternal(Configuration c) {
+    return SnowflakeIO.<List<byte[]>>write()
         .withDataSourceConfiguration(c.getDataSourceConfiguration())
         .withStorageIntegrationName(c.getStorageIntegrationName())
         .withStagingBucketName(c.getStagingBucketName())
@@ -90,7 +91,6 @@ public class WriteBuilder
   }
 
   private static SnowflakeIO.UserDataMapper<List<byte[]>> getStringCsvMapper() {
-    return (SnowflakeIO.UserDataMapper<List<byte[]>>)
-        recordLine -> recordLine.stream().map(String::new).toArray();
+    return recordLine -> recordLine.stream().map(String::new).toArray();
   }
 }
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeTableSchema.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeTableSchema.java
index 095feb8c324..f47ea9fc339 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeTableSchema.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/data/SnowflakeTableSchema.java
@@ -22,11 +22,12 @@ import java.util.ArrayList;
 import java.util.List;
 import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
+import org.apache.beam.sdk.io.snowflake.enums.CreateDisposition;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
 
 /**
  * POJO representing schema of Table in Snowflake. Used by {@link SnowflakeIO.Write} when {@link
- * SnowflakeIO.Write.CreateDisposition#CREATE_IF_NEEDED} disposition is used.
+ * CreateDisposition#CREATE_IF_NEEDED} disposition is used.
  */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceConfig.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceConfig.java
index d9f018bea50..36819e196aa 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceConfig.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceConfig.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class SnowflakeBatchServiceConfig extends ServiceConfig {
+public class SnowflakeBatchServiceConfig {
   private final SerializableFunction<Void, DataSource> dataSourceProviderFn;
 
   private final String database;
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceImpl.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceImpl.java
index 2700d8bfaa3..279e8fbb808 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceImpl.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceImpl.java
@@ -34,12 +34,11 @@ import org.apache.beam.sdk.io.snowflake.enums.CreateDisposition;
 import org.apache.beam.sdk.io.snowflake.enums.WriteDisposition;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 
-/** Implemenation of {@link SnowflakeService} used in production. */
+/** Implemenation of {@link SnowflakeServices.BatchService} used in production. */
 @SuppressWarnings({
-  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class SnowflakeBatchServiceImpl implements SnowflakeService<SnowflakeBatchServiceConfig> {
+public class SnowflakeBatchServiceImpl implements SnowflakeServices.BatchService {
   private static final String SNOWFLAKE_GCS_PREFIX = "gcs://";
   private static final String GCS_PREFIX = "gs://";
 
@@ -168,7 +167,7 @@ public class SnowflakeBatchServiceImpl implements SnowflakeService<SnowflakeBatc
         selectQuery,
         resultSet -> {
           assert resultSet != null;
-          checkIfTableIsEmpty((ResultSet) resultSet);
+          checkIfTableIsEmpty(resultSet);
         });
   }
 
@@ -234,7 +233,7 @@ public class SnowflakeBatchServiceImpl implements SnowflakeService<SnowflakeBatc
         query,
         resultSet -> {
           assert resultSet != null;
-          if (!checkResultIfTableExists((ResultSet) resultSet)) {
+          if (!checkResultIfTableExists(resultSet)) {
             try {
               createTable(dataSource, table, tableSchema);
             } catch (SQLException e) {
@@ -271,13 +270,15 @@ public class SnowflakeBatchServiceImpl implements SnowflakeService<SnowflakeBatc
   }
 
   private static void runConnectionWithStatement(
-      DataSource dataSource, String query, Consumer resultSetMethod) throws SQLException {
+      DataSource dataSource, String query, Consumer<ResultSet> resultSetMethod)
+      throws SQLException {
     Connection connection = dataSource.getConnection();
     runStatement(query, connection, resultSetMethod);
     connection.close();
   }
 
-  private static void runStatement(String query, Connection connection, Consumer resultSetMethod)
+  private static void runStatement(
+      String query, Connection connection, Consumer<ResultSet> resultSetMethod)
       throws SQLException {
     PreparedStatement statement = connection.prepareStatement(query);
     try {
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeService.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeServices.java
similarity index 66%
copy from sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeService.java
copy to sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeServices.java
index 192856ec477..61b1df21d0e 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeService.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeServices.java
@@ -20,8 +20,20 @@ package org.apache.beam.sdk.io.snowflake.services;
 import java.io.Serializable;
 
 /** Interface which defines common methods for interacting with Snowflake. */
-public interface SnowflakeService<T extends ServiceConfig> extends Serializable {
-  String read(T config) throws Exception;
+public interface SnowflakeServices extends Serializable {
+  BatchService getBatchService();
 
-  void write(T config) throws Exception;
+  StreamingService getStreamingService();
+
+  interface BatchService {
+    String read(SnowflakeBatchServiceConfig config) throws Exception;
+
+    void write(SnowflakeBatchServiceConfig config) throws Exception;
+  }
+
+  interface StreamingService {
+    String read(SnowflakeStreamingServiceConfig config) throws Exception;
+
+    void write(SnowflakeStreamingServiceConfig config) throws Exception;
+  }
 }
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeService.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeServicesImpl.java
similarity index 75%
rename from sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeService.java
rename to sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeServicesImpl.java
index 192856ec477..a8ccf13ff48 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeService.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeServicesImpl.java
@@ -17,11 +17,14 @@
  */
 package org.apache.beam.sdk.io.snowflake.services;
 
-import java.io.Serializable;
+public class SnowflakeServicesImpl implements SnowflakeServices {
+  @Override
+  public BatchService getBatchService() {
+    return new SnowflakeBatchServiceImpl();
+  }
 
-/** Interface which defines common methods for interacting with Snowflake. */
-public interface SnowflakeService<T extends ServiceConfig> extends Serializable {
-  String read(T config) throws Exception;
-
-  void write(T config) throws Exception;
+  @Override
+  public StreamingService getStreamingService() {
+    return new SnowflakeStreamingServiceImpl();
+  }
 }
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceConfig.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceConfig.java
index 7039c899133..d8bb53c3f50 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceConfig.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceConfig.java
@@ -21,7 +21,7 @@ import java.util.List;
 import net.snowflake.ingest.SimpleIngestManager;
 
 /** Class for preparing configuration for streaming write. */
-public class SnowflakeStreamingServiceConfig extends ServiceConfig {
+public class SnowflakeStreamingServiceConfig {
   private final SimpleIngestManager ingestManager;
   private final List<String> filesList;
   private final String stagingBucketDir;
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceImpl.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceImpl.java
index 2634a17782c..73efef4a14e 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceImpl.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceImpl.java
@@ -25,12 +25,11 @@ import java.util.stream.Collectors;
 import net.snowflake.ingest.SimpleIngestManager;
 import net.snowflake.ingest.connection.IngestResponseException;
 
-/** Implemenation of {@link SnowflakeService} used in production. */
+/** Implementation of {@link SnowflakeServices.StreamingService} used in production. */
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class SnowflakeStreamingServiceImpl
-    implements SnowflakeService<SnowflakeStreamingServiceConfig> {
+public class SnowflakeStreamingServiceImpl implements SnowflakeServices.StreamingService {
 
   private transient SimpleIngestManager ingestManager;
 
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeBatchServiceImpl.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeBatchServiceImpl.java
index 90ee4b9d394..d4c4523845b 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeBatchServiceImpl.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeBatchServiceImpl.java
@@ -29,11 +29,10 @@ import org.apache.beam.sdk.io.snowflake.data.SnowflakeTableSchema;
 import org.apache.beam.sdk.io.snowflake.enums.CreateDisposition;
 import org.apache.beam.sdk.io.snowflake.enums.WriteDisposition;
 import org.apache.beam.sdk.io.snowflake.services.SnowflakeBatchServiceConfig;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
 
 /** Fake implementation of {@link SnowflakeService} used in tests. */
-public class FakeSnowflakeBatchServiceImpl
-    implements SnowflakeService<SnowflakeBatchServiceConfig> {
+public class FakeSnowflakeBatchServiceImpl implements SnowflakeServices.BatchService {
 
   @Override
   public void write(SnowflakeBatchServiceConfig config) throws Exception {
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/ServiceConfig.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeServicesImpl.java
similarity index 66%
rename from sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/ServiceConfig.java
rename to sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeServicesImpl.java
index 1826ce9368e..168741c3562 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/ServiceConfig.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeServicesImpl.java
@@ -15,10 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.io.snowflake.services;
+package org.apache.beam.sdk.io.snowflake.test;
 
-/**
- * Configuration abstract class for {@link SnowflakeService} that gives parameters for write and
- * read (batch and streaming).
- */
-public abstract class ServiceConfig {}
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
+
+public class FakeSnowflakeServicesImpl implements SnowflakeServices {
+  @Override
+  public BatchService getBatchService() {
+    return new FakeSnowflakeBatchServiceImpl();
+  }
+
+  @Override
+  public StreamingService getStreamingService() {
+    return new FakeSnowflakeStreamingServiceImpl();
+  }
+}
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeStreamingServiceImpl.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeStreamingServiceImpl.java
index 8dc4694e72a..ac49b6e979b 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeStreamingServiceImpl.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/FakeSnowflakeStreamingServiceImpl.java
@@ -25,12 +25,11 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.zip.GZIPInputStream;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
 import org.apache.beam.sdk.io.snowflake.services.SnowflakeStreamingServiceConfig;
 
-/** Fake implementation of {@link SnowflakeService} used in tests. */
-public class FakeSnowflakeStreamingServiceImpl
-    implements SnowflakeService<SnowflakeStreamingServiceConfig> {
+/** Fake implementation of {@link SnowflakeServices.StreamingService} used in tests. */
+public class FakeSnowflakeStreamingServiceImpl implements SnowflakeServices.StreamingService {
   private FakeSnowflakeIngestManager snowflakeIngestManager;
 
   @Override
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/TestUtils.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/TestUtils.java
index 8807a972eaf..61853cdb452 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/TestUtils.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/TestUtils.java
@@ -48,9 +48,6 @@ import org.apache.beam.sdk.values.KV;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings({
-  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-})
 public class TestUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class);
@@ -112,40 +109,37 @@ public class TestUtils {
   }
 
   public static SnowflakeIO.UserDataMapper<Long> getCsvMapper() {
-    return (SnowflakeIO.UserDataMapper<Long>) recordLine -> new String[] {recordLine.toString()};
+    return recordLine -> new String[] {recordLine.toString()};
   }
 
   public static SnowflakeIO.UserDataMapper<KV<String, Long>> getLongCsvMapperKV() {
-    return (SnowflakeIO.UserDataMapper<KV<String, Long>>)
-        recordLine -> new Long[] {recordLine.getValue()};
+    return recordLine -> new Long[] {recordLine.getValue()};
   }
 
   public static SnowflakeIO.UserDataMapper<Long> getLongCsvMapper() {
-    return (SnowflakeIO.UserDataMapper<Long>) recordLine -> new Long[] {recordLine};
+    return recordLine -> new Long[] {recordLine};
   }
 
   public static SnowflakeIO.CsvMapper<TestRow> getTestRowCsvMapper() {
-    return (SnowflakeIO.CsvMapper<TestRow>)
-        parts -> TestRow.create(Integer.valueOf(parts[0]), parts[1]);
+    return parts -> TestRow.create(Integer.valueOf(parts[0]), parts[1]);
   }
 
   public static SnowflakeIO.UserDataMapper<TestRow> getTestRowDataMapper() {
-    return (SnowflakeIO.UserDataMapper<TestRow>)
-        (TestRow element) -> new Object[] {element.id(), element.name()};
+    return (TestRow element) -> new Object[] {element.id(), element.name()};
   }
 
   public static SnowflakeIO.UserDataMapper<String[]> getLStringCsvMapper() {
-    return (SnowflakeIO.UserDataMapper<String[]>) recordLine -> recordLine;
+    return recordLine -> recordLine;
   }
 
   public static SnowflakeIO.UserDataMapper<String> getStringCsvMapper() {
-    return (SnowflakeIO.UserDataMapper<String>) recordLine -> new String[] {recordLine};
+    return recordLine -> new String[] {recordLine};
   }
 
   public static class ParseToKv extends DoFn<Long, KV<String, Long>> {
     @ProcessElement
     public void processElement(ProcessContext c) {
-      KV stringIntKV = KV.of(c.element().toString(), c.element().longValue());
+      KV<String, Long> stringIntKV = KV.of(c.element().toString(), c.element().longValue());
       c.output(stringIntKV);
     }
   }
@@ -167,24 +161,24 @@ public class TestUtils {
     return lines;
   }
 
-  public static String getValidEncryptedPrivateKeyPath(Class c) {
+  public static String getValidEncryptedPrivateKeyPath(Class<?> c) {
     return getPrivateKeyPath(c, VALID_ENCRYPTED_PRIVATE_KEY_FILE_NAME);
   }
 
-  public static String getValidUnencryptedPrivateKeyPath(Class c) {
+  public static String getValidUnencryptedPrivateKeyPath(Class<?> c) {
     return getPrivateKeyPath(c, VALID_UNENCRYPTED_PRIVATE_KEY_FILE_NAME);
   }
 
-  public static String getInvalidPrivateKeyPath(Class c) {
+  public static String getInvalidPrivateKeyPath(Class<?> c) {
     return getPrivateKeyPath(c, INVALID_PRIVATE_KEY_FILE_NAME);
   }
 
-  public static String getRawValidEncryptedPrivateKey(Class c) throws IOException {
+  public static String getRawValidEncryptedPrivateKey(Class<?> c) throws IOException {
     byte[] keyBytes = Files.readAllBytes(Paths.get(getValidEncryptedPrivateKeyPath(c)));
     return new String(keyBytes, StandardCharsets.UTF_8);
   }
 
-  public static String getRawValidUnencryptedPrivateKey(Class c) throws IOException {
+  public static String getRawValidUnencryptedPrivateKey(Class<?> c) throws IOException {
     byte[] keyBytes = Files.readAllBytes(Paths.get(getValidUnencryptedPrivateKeyPath(c)));
     return new String(keyBytes, StandardCharsets.UTF_8);
   }
@@ -193,7 +187,7 @@ public class TestUtils {
     return PRIVATE_KEY_PASSPHRASE;
   }
 
-  private static String getPrivateKeyPath(Class c, String path) {
+  private static String getPrivateKeyPath(Class<?> c, String path) {
     ClassLoader classLoader = c.getClassLoader();
     File file = new File(classLoader.getResource(path).getFile());
     return file.getAbsolutePath();
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/data/SnowflakeDataTypeValidTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/data/SnowflakeDataTypeValidTest.java
index e42a6c712a3..f72c16e270a 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/data/SnowflakeDataTypeValidTest.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/data/SnowflakeDataTypeValidTest.java
@@ -51,9 +51,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
-@SuppressWarnings({
-  "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-})
 public class SnowflakeDataTypeValidTest {
   private SnowflakeDataType snowflakeDataType;
   private String expectedResult;
@@ -64,7 +61,7 @@ public class SnowflakeDataTypeValidTest {
   }
 
   @Parameterized.Parameters
-  public static Collection primeNumbers() {
+  public static Collection<Object[]> primeNumbers() {
     return Arrays.asList(
         new Object[][] {
           {SnowflakeBoolean.of(), "BOOLEAN"},
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/read/SnowflakeIOReadTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/read/SnowflakeIOReadTest.java
index 024ef47aac2..10403fda285 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/read/SnowflakeIOReadTest.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/read/SnowflakeIOReadTest.java
@@ -26,10 +26,10 @@ import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.io.AvroGeneratedUser;
 import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
 import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBasicDataSource;
-import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBatchServiceImpl;
 import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeDatabase;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeServicesImpl;
 import org.apache.beam.sdk.io.snowflake.test.TestSnowflakePipelineOptions;
 import org.apache.beam.sdk.io.snowflake.test.TestUtils;
 import org.apache.beam.sdk.testing.PAssert;
@@ -45,9 +45,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
-@SuppressWarnings({
-  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-})
 public class SnowflakeIOReadTest implements Serializable {
   public static final String FAKE_TABLE = "FAKE_TABLE";
   public static final String FAKE_QUERY = "SELECT * FROM FAKE_TABLE";
@@ -60,7 +57,7 @@ public class SnowflakeIOReadTest implements Serializable {
   @Rule public transient ExpectedException thrown = ExpectedException.none();
 
   private static SnowflakeIO.DataSourceConfiguration dataSourceConfiguration;
-  private static SnowflakeService snowflakeService;
+  private static SnowflakeServices snowflakeServices;
   private static List<GenericRecord> avroTestData;
 
   @BeforeClass
@@ -83,7 +80,7 @@ public class SnowflakeIOReadTest implements Serializable {
         SnowflakeIO.DataSourceConfiguration.create(new FakeSnowflakeBasicDataSource())
             .withServerName(options.getServerName());
 
-    snowflakeService = new FakeSnowflakeBatchServiceImpl();
+    snowflakeServices = new FakeSnowflakeServicesImpl();
   }
 
   @AfterClass
@@ -97,7 +94,7 @@ public class SnowflakeIOReadTest implements Serializable {
     thrown.expectMessage("withStagingBucketName() is required");
 
     pipeline.apply(
-        SnowflakeIO.<GenericRecord>read(snowflakeService)
+        SnowflakeIO.<GenericRecord>read(snowflakeServices)
             .withDataSourceConfiguration(dataSourceConfiguration)
             .fromTable(FAKE_TABLE)
             .withStorageIntegrationName(options.getStorageIntegrationName())
@@ -113,7 +110,7 @@ public class SnowflakeIOReadTest implements Serializable {
     thrown.expectMessage("withStorageIntegrationName() is required");
 
     pipeline.apply(
-        SnowflakeIO.<GenericRecord>read(snowflakeService)
+        SnowflakeIO.<GenericRecord>read(snowflakeServices)
             .withDataSourceConfiguration(dataSourceConfiguration)
             .fromTable(FAKE_TABLE)
             .withStagingBucketName(options.getStagingBucketName())
@@ -129,7 +126,7 @@ public class SnowflakeIOReadTest implements Serializable {
     thrown.expectMessage("withCsvMapper() is required");
 
     pipeline.apply(
-        SnowflakeIO.<GenericRecord>read(snowflakeService)
+        SnowflakeIO.<GenericRecord>read(snowflakeServices)
             .withDataSourceConfiguration(dataSourceConfiguration)
             .fromTable(FAKE_TABLE)
             .withStagingBucketName(options.getStagingBucketName())
@@ -145,7 +142,7 @@ public class SnowflakeIOReadTest implements Serializable {
     thrown.expectMessage("withCoder() is required");
 
     pipeline.apply(
-        SnowflakeIO.<GenericRecord>read(snowflakeService)
+        SnowflakeIO.<GenericRecord>read(snowflakeServices)
             .withDataSourceConfiguration(dataSourceConfiguration)
             .fromTable(FAKE_TABLE)
             .withStagingBucketName(options.getStagingBucketName())
@@ -161,7 +158,7 @@ public class SnowflakeIOReadTest implements Serializable {
     thrown.expectMessage("fromTable() or fromQuery() is required");
 
     pipeline.apply(
-        SnowflakeIO.<GenericRecord>read(snowflakeService)
+        SnowflakeIO.<GenericRecord>read(snowflakeServices)
             .withDataSourceConfiguration(dataSourceConfiguration)
             .withStagingBucketName(options.getStagingBucketName())
             .withStorageIntegrationName(options.getStorageIntegrationName())
@@ -177,7 +174,7 @@ public class SnowflakeIOReadTest implements Serializable {
     thrown.expectMessage("withDataSourceConfiguration() or withDataSourceProviderFn() is required");
 
     pipeline.apply(
-        SnowflakeIO.<GenericRecord>read(snowflakeService)
+        SnowflakeIO.<GenericRecord>read(snowflakeServices)
             .fromTable(FAKE_TABLE)
             .withStagingBucketName(options.getStagingBucketName())
             .withStorageIntegrationName(options.getStorageIntegrationName())
@@ -193,7 +190,7 @@ public class SnowflakeIOReadTest implements Serializable {
     thrown.expectMessage("fromTable() and fromQuery() are not allowed together");
 
     pipeline.apply(
-        SnowflakeIO.<GenericRecord>read(snowflakeService)
+        SnowflakeIO.<GenericRecord>read(snowflakeServices)
             .withDataSourceConfiguration(dataSourceConfiguration)
             .fromQuery("")
             .fromTable(FAKE_TABLE)
@@ -211,7 +208,7 @@ public class SnowflakeIOReadTest implements Serializable {
     thrown.expectMessage("SQL compilation error: Table does not exist");
 
     pipeline.apply(
-        SnowflakeIO.<GenericRecord>read(snowflakeService)
+        SnowflakeIO.<GenericRecord>read(snowflakeServices)
             .withDataSourceConfiguration(dataSourceConfiguration)
             .fromTable("NON_EXIST")
             .withStagingBucketName(options.getStagingBucketName())
@@ -228,7 +225,7 @@ public class SnowflakeIOReadTest implements Serializable {
     thrown.expectMessage("SQL compilation error: Invalid query");
 
     pipeline.apply(
-        SnowflakeIO.<GenericRecord>read(snowflakeService)
+        SnowflakeIO.<GenericRecord>read(snowflakeServices)
             .withDataSourceConfiguration(dataSourceConfiguration)
             .fromQuery("BAD_QUERY")
             .withStagingBucketName(options.getStagingBucketName())
@@ -243,7 +240,7 @@ public class SnowflakeIOReadTest implements Serializable {
   public void testReadFromTable() {
     PCollection<GenericRecord> items =
         pipeline.apply(
-            SnowflakeIO.<GenericRecord>read(snowflakeService)
+            SnowflakeIO.<GenericRecord>read(snowflakeServices)
                 .withDataSourceConfiguration(dataSourceConfiguration)
                 .fromTable(FAKE_TABLE)
                 .withStagingBucketName(options.getStagingBucketName())
@@ -259,7 +256,7 @@ public class SnowflakeIOReadTest implements Serializable {
   public void testReadFromQuery() {
     PCollection<GenericRecord> items =
         pipeline.apply(
-            SnowflakeIO.<GenericRecord>read(snowflakeService)
+            SnowflakeIO.<GenericRecord>read(snowflakeServices)
                 .withDataSourceConfiguration(dataSourceConfiguration)
                 .fromQuery(FAKE_QUERY)
                 .withStagingBucketName(options.getStagingBucketName())
@@ -272,12 +269,11 @@ public class SnowflakeIOReadTest implements Serializable {
   }
 
   static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
-    return (SnowflakeIO.CsvMapper<GenericRecord>)
-        parts ->
-            new GenericRecordBuilder(AvroGeneratedUser.getClassSchema())
-                .set("name", String.valueOf(parts[0]))
-                .set("favorite_number", Integer.valueOf(parts[1]))
-                .set("favorite_color", String.valueOf(parts[2]))
-                .build();
+    return parts ->
+        new GenericRecordBuilder(AvroGeneratedUser.getClassSchema())
+            .set("name", String.valueOf(parts[0]))
+            .set("favorite_number", Integer.valueOf(parts[1]))
+            .set("favorite_color", String.valueOf(parts[2]))
+            .build();
   }
 }
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/CreateDispositionTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/CreateDispositionTest.java
index f91fe31f5fe..f279c086089 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/CreateDispositionTest.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/CreateDispositionTest.java
@@ -29,10 +29,10 @@ import org.apache.beam.sdk.io.snowflake.data.SnowflakeColumn;
 import org.apache.beam.sdk.io.snowflake.data.SnowflakeTableSchema;
 import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeVarchar;
 import org.apache.beam.sdk.io.snowflake.enums.CreateDisposition;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
 import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBasicDataSource;
-import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBatchServiceImpl;
 import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeDatabase;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeServicesImpl;
 import org.apache.beam.sdk.io.snowflake.test.TestSnowflakePipelineOptions;
 import org.apache.beam.sdk.io.snowflake.test.TestUtils;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -48,9 +48,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
-@SuppressWarnings({
-  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-})
 public class CreateDispositionTest {
   private static final String FAKE_TABLE = "FAKE_TABLE";
   private static final String BUCKET_NAME = "bucket/";
@@ -63,7 +60,7 @@ public class CreateDispositionTest {
   private static String stagingBucketName;
   private static String storageIntegrationName;
 
-  private static SnowflakeService snowflakeService;
+  private static SnowflakeServices snowflakeServices;
   private static List<Long> testData;
 
   @BeforeClass
@@ -76,7 +73,7 @@ public class CreateDispositionTest {
     stagingBucketName = options.getStagingBucketName();
     storageIntegrationName = options.getStorageIntegrationName();
 
-    snowflakeService = new FakeSnowflakeBatchServiceImpl();
+    snowflakeServices = new FakeSnowflakeServicesImpl();
     testData = LongStream.range(0, 100).boxed().collect(Collectors.toList());
 
     dc =
@@ -109,7 +106,7 @@ public class CreateDispositionTest {
                 .withUserDataMapper(TestUtils.getLongCsvMapper())
                 .withFileNameTemplate("output")
                 .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();
 
@@ -137,7 +134,7 @@ public class CreateDispositionTest {
                 .withFileNameTemplate("output")
                 .withUserDataMapper(getCsvMapper())
                 .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();
   }
@@ -161,7 +158,7 @@ public class CreateDispositionTest {
                 .withFileNameTemplate("output")
                 .withUserDataMapper(TestUtils.getLongCsvMapper())
                 .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();
     List<Long> actualData = FakeSnowflakeDatabase.getElementsAsLong("NO_EXIST_TABLE");
@@ -185,7 +182,7 @@ public class CreateDispositionTest {
                 .withFileNameTemplate("output")
                 .withUserDataMapper(TestUtils.getLongCsvMapper())
                 .withCreateDisposition(CreateDisposition.CREATE_NEVER)
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();
     List<Long> actualData = FakeSnowflakeDatabase.getElementsAsLong(FAKE_TABLE);
@@ -211,7 +208,7 @@ public class CreateDispositionTest {
                 .withFileNameTemplate("output")
                 .withUserDataMapper(TestUtils.getLongCsvMapper())
                 .withCreateDisposition(CreateDisposition.CREATE_NEVER)
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();
   }
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/QueryDispositionLocationTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/QueryDispositionLocationTest.java
index 1aa1ad0b1b9..47f3bfeaa8a 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/QueryDispositionLocationTest.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/QueryDispositionLocationTest.java
@@ -26,10 +26,10 @@ import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
 import org.apache.beam.sdk.io.snowflake.enums.WriteDisposition;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
 import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBasicDataSource;
-import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBatchServiceImpl;
 import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeDatabase;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeServicesImpl;
 import org.apache.beam.sdk.io.snowflake.test.TestSnowflakePipelineOptions;
 import org.apache.beam.sdk.io.snowflake.test.TestUtils;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -45,9 +45,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
-@SuppressWarnings({
-  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-})
 public class QueryDispositionLocationTest {
   private static final String FAKE_TABLE = "FAKE_TABLE";
   private static final String BUCKET_NAME = "BUCKET/";
@@ -58,7 +55,7 @@ public class QueryDispositionLocationTest {
   private static TestSnowflakePipelineOptions options;
   private static SnowflakeIO.DataSourceConfiguration dc;
 
-  private static SnowflakeService snowflakeService;
+  private static SnowflakeServices snowflakeServices;
   private static List<Long> testData;
 
   @BeforeClass
@@ -66,7 +63,7 @@ public class QueryDispositionLocationTest {
     PipelineOptionsFactory.register(TestSnowflakePipelineOptions.class);
     options = TestPipeline.testingPipelineOptions().as(TestSnowflakePipelineOptions.class);
 
-    snowflakeService = new FakeSnowflakeBatchServiceImpl();
+    snowflakeServices = new FakeSnowflakeServicesImpl();
     testData = LongStream.range(0, 100).boxed().collect(Collectors.toList());
   }
 
@@ -102,7 +99,7 @@ public class QueryDispositionLocationTest {
                 .withUserDataMapper(TestUtils.getLongCsvMapper())
                 .withFileNameTemplate("output")
                 .withWriteDisposition(WriteDisposition.TRUNCATE)
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();
 
@@ -131,7 +128,7 @@ public class QueryDispositionLocationTest {
                 .withUserDataMapper(TestUtils.getLongCsvMapper())
                 .withFileNameTemplate("output")
                 .withWriteDisposition(WriteDisposition.EMPTY)
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();
   }
@@ -152,7 +149,7 @@ public class QueryDispositionLocationTest {
                 .withFileNameTemplate("output")
                 .withUserDataMapper(TestUtils.getLongCsvMapper())
                 .withWriteDisposition(WriteDisposition.EMPTY)
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();
 
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SchemaDispositionTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SchemaDispositionTest.java
index 89edc91c411..4073ac8b59e 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SchemaDispositionTest.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SchemaDispositionTest.java
@@ -35,10 +35,10 @@ import org.apache.beam.sdk.io.snowflake.data.structured.SnowflakeObject;
 import org.apache.beam.sdk.io.snowflake.data.structured.SnowflakeVariant;
 import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeText;
 import org.apache.beam.sdk.io.snowflake.enums.CreateDisposition;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
 import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBasicDataSource;
-import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBatchServiceImpl;
 import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeDatabase;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeServicesImpl;
 import org.apache.beam.sdk.io.snowflake.test.TestSnowflakePipelineOptions;
 import org.apache.beam.sdk.io.snowflake.test.TestUtils;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -54,9 +54,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
-@SuppressWarnings({
-  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-})
 public class SchemaDispositionTest {
   private static final String BUCKET_NAME = "BUCKET/";
 
@@ -68,7 +65,7 @@ public class SchemaDispositionTest {
   private static String stagingBucketName;
   private static String storageIntegrationName;
 
-  private static SnowflakeService snowflakeService;
+  private static SnowflakeServices snowflakeServices;
 
   @BeforeClass
   public static void setupAll() {
@@ -80,7 +77,7 @@ public class SchemaDispositionTest {
     stagingBucketName = options.getStagingBucketName();
     storageIntegrationName = options.getStorageIntegrationName();
 
-    snowflakeService = new FakeSnowflakeBatchServiceImpl();
+    snowflakeServices = new FakeSnowflakeServicesImpl();
 
     dc =
         SnowflakeIO.DataSourceConfiguration.create(new FakeSnowflakeBasicDataSource())
@@ -130,7 +127,7 @@ public class SchemaDispositionTest {
                 .withFileNameTemplate("output")
                 .withUserDataMapper(TestUtils.getLStringCsvMapper())
                 .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();
 
@@ -168,7 +165,7 @@ public class SchemaDispositionTest {
                 .withFileNameTemplate("output")
                 .withUserDataMapper(getCsvMapper())
                 .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();
 
@@ -208,7 +205,7 @@ public class SchemaDispositionTest {
                 .withFileNameTemplate("output")
                 .withUserDataMapper(getCsvMapper())
                 .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();
 
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SnowflakeIOWriteTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SnowflakeIOWriteTest.java
index 185015169d9..f5784284098 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SnowflakeIOWriteTest.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/SnowflakeIOWriteTest.java
@@ -26,10 +26,10 @@ import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 import net.snowflake.client.jdbc.SnowflakeSQLException;
 import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
 import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBasicDataSource;
-import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBatchServiceImpl;
 import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeDatabase;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeServicesImpl;
 import org.apache.beam.sdk.io.snowflake.test.TestSnowflakePipelineOptions;
 import org.apache.beam.sdk.io.snowflake.test.TestUtils;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -47,9 +47,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
-@SuppressWarnings({
-  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-})
 public class SnowflakeIOWriteTest {
   private static final String FAKE_TABLE = "FAKE_TABLE";
   private static final String BUCKET_NAME = "BUCKET/";
@@ -61,13 +58,13 @@ public class SnowflakeIOWriteTest {
   private static TestSnowflakePipelineOptions options;
   private static SnowflakeIO.DataSourceConfiguration dc;
 
-  private static SnowflakeService snowflakeService;
+  private static SnowflakeServices snowflakeServices;
   private static List<Long> testData;
   private static List<String> testDataInStrings;
 
   @BeforeClass
   public static void setupAll() {
-    snowflakeService = new FakeSnowflakeBatchServiceImpl();
+    snowflakeServices = new FakeSnowflakeServicesImpl();
     testData = LongStream.range(0, 100).boxed().collect(Collectors.toList());
 
     testDataInStrings = new ArrayList<>();
@@ -112,7 +109,7 @@ public class SnowflakeIOWriteTest {
                 .to(FAKE_TABLE)
                 .withStagingBucketName(options.getStagingBucketName())
                 .withStorageIntegrationName(options.getStorageIntegrationName())
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();
 
@@ -133,7 +130,7 @@ public class SnowflakeIOWriteTest {
                 .withStorageIntegrationName(options.getStorageIntegrationName())
                 .withDataSourceConfiguration(dc)
                 .withUserDataMapper(TestUtils.getLongCsvMapper())
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();
 
@@ -155,7 +152,7 @@ public class SnowflakeIOWriteTest {
                 .to(FAKE_TABLE)
                 .withStagingBucketName(options.getStagingBucketName())
                 .withStorageIntegrationName(options.getStorageIntegrationName())
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();
 
@@ -180,7 +177,7 @@ public class SnowflakeIOWriteTest {
                 .withUserDataMapper(TestUtils.getLongCsvMapperKV())
                 .withDataSourceConfiguration(dc)
                 .withQueryTransformation(query)
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();
 
@@ -202,7 +199,7 @@ public class SnowflakeIOWriteTest {
                 .to(FAKE_TABLE)
                 .withStagingBucketName(options.getStagingBucketName())
                 .withStorageIntegrationName(options.getStorageIntegrationName())
-                .withSnowflakeService(snowflakeService)
+                .withSnowflakeServices(snowflakeServices)
                 .withQuotationMark("\""));
 
     pipeline.run(options).waitUntilFinish();
@@ -228,7 +225,7 @@ public class SnowflakeIOWriteTest {
                 .to(FAKE_TABLE)
                 .withStagingBucketName(options.getStagingBucketName())
                 .withStorageIntegrationName(options.getStorageIntegrationName())
-                .withSnowflakeService(snowflakeService)
+                .withSnowflakeServices(snowflakeServices)
                 .withQuotationMark(""));
 
     pipeline.run(options).waitUntilFinish();
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java
index a7f2cbe17e9..780aa26356a 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java
@@ -33,10 +33,10 @@ import java.util.stream.LongStream;
 import net.snowflake.client.jdbc.SnowflakeSQLException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
-import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeServices;
 import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBasicDataSource;
 import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeDatabase;
-import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeStreamingServiceImpl;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeServicesImpl;
 import org.apache.beam.sdk.io.snowflake.test.TestSnowflakePipelineOptions;
 import org.apache.beam.sdk.io.snowflake.test.TestUtils;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -58,9 +58,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
-@SuppressWarnings({
-  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
-})
 public class StreamingWriteTest {
 
   private static final String FAKE_TABLE = "TEST_TABLE";
@@ -73,7 +70,7 @@ public class StreamingWriteTest {
 
   @Rule public ExpectedException exceptionRule = ExpectedException.none();
   private static SnowflakeIO.DataSourceConfiguration dataSourceConfiguration;
-  private static SnowflakeService snowflakeService;
+  private static SnowflakeServices snowflakeServices;
   private static TestSnowflakePipelineOptions options;
   private static List<Long> testData;
 
@@ -92,7 +89,7 @@ public class StreamingWriteTest {
 
   @BeforeClass
   public static void setup() {
-    snowflakeService = new FakeSnowflakeStreamingServiceImpl();
+    snowflakeServices = new FakeSnowflakeServicesImpl();
 
     PipelineOptionsFactory.register(TestSnowflakePipelineOptions.class);
     options = TestPipeline.testingPipelineOptions().as(TestSnowflakePipelineOptions.class);
@@ -138,7 +135,7 @@ public class StreamingWriteTest {
                 .withStorageIntegrationName(STORAGE_INTEGRATION_NAME)
                 .withSnowPipe(SNOW_PIPE)
                 .withUserDataMapper(TestUtils.getLongCsvMapper())
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options);
   }
@@ -165,7 +162,7 @@ public class StreamingWriteTest {
                 .withStorageIntegrationName(STORAGE_INTEGRATION_NAME)
                 .withSnowPipe(SNOW_PIPE)
                 .withUserDataMapper(TestUtils.getLongCsvMapper())
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options);
   }
@@ -211,7 +208,7 @@ public class StreamingWriteTest {
                 .withFlushRowLimit(4)
                 .withFlushTimeLimit(WINDOW_DURATION)
                 .withUserDataMapper(TestUtils.getStringCsvMapper())
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();
 
@@ -271,7 +268,7 @@ public class StreamingWriteTest {
                 .withQuotationMark(quotationMark)
                 .withFlushTimeLimit(WINDOW_DURATION)
                 .withUserDataMapper(TestUtils.getStringCsvMapper())
-                .withSnowflakeService(snowflakeService));
+                .withSnowflakeServices(snowflakeServices));
 
     pipeline.run(options).waitUntilFinish();