You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/10 18:34:03 UTC

[1/2] incubator-beam git commit: This closes #1308

Repository: incubator-beam
Updated Branches:
  refs/heads/master 8d403a289 -> ab06647f9


This closes #1308


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ab06647f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ab06647f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ab06647f

Branch: refs/heads/master
Commit: ab06647f9d9c4fb72d2571dc2225b71b9fbdffbd
Parents: 8d403a2 0ebb7a1
Author: Thomas Groh <tg...@google.com>
Authored: Thu Nov 10 10:27:01 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 10 10:27:01 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 98 ++++++++++++++------
 .../sdk/io/gcp/bigtable/BigtableService.java    |  6 ++
 .../io/gcp/bigtable/BigtableServiceImpl.java    |  5 +
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     | 83 +++++++++++++++--
 4 files changed, 158 insertions(+), 34 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Use Credentials from GcpOptions instead of BigtableOptions

Posted by tg...@apache.org.
Use Credentials from GcpOptions instead of BigtableOptions

Fixes [BEAM-939]


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0ebb7a1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0ebb7a1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0ebb7a1e

Branch: refs/heads/master
Commit: 0ebb7a1edb98c035b54d382a2e2d77331dac829a
Parents: 8d403a2
Author: Luke Cwik <lc...@google.com>
Authored: Mon Nov 7 19:37:42 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 10 10:27:01 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 98 ++++++++++++++------
 .../sdk/io/gcp/bigtable/BigtableService.java    |  6 ++
 .../io/gcp/bigtable/BigtableServiceImpl.java    |  5 +
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     | 83 +++++++++++++++--
 4 files changed, 158 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ebb7a1e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 90b9584..1ee9253 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -27,7 +27,10 @@ import com.google.bigtable.v2.Row;
 import com.google.bigtable.v2.RowFilter;
 import com.google.bigtable.v2.SampleRowKeysResponse;
 import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.config.CredentialOptions.CredentialType;
 import com.google.cloud.bigtable.config.RetryOptions;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.FutureCallback;
@@ -49,11 +52,13 @@ import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
 import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
+import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.values.KV;
@@ -283,7 +288,12 @@ public class BigtableIO {
     @Override
     public PCollection<Row> apply(PBegin input) {
       BigtableSource source =
-          new BigtableSource(getBigtableService(), tableId, filter, keyRange, null);
+          new BigtableSource(new SerializableFunction<PipelineOptions, BigtableService>() {
+            @Override
+            public BigtableService apply(PipelineOptions options) {
+              return getBigtableService(options);
+            }
+          }, tableId, filter, keyRange, null);
       return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
     }
 
@@ -293,7 +303,9 @@ public class BigtableIO {
       checkArgument(!tableId.isEmpty(), "Table ID not specified");
       try {
         checkArgument(
-            getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
+            getBigtableService(input.getPipeline().getOptions()).tableExists(tableId),
+            "Table %s does not exist",
+            tableId);
       } catch (IOException e) {
         logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
       }
@@ -372,12 +384,22 @@ public class BigtableIO {
      * Helper function that either returns the mock Bigtable service supplied by
      * {@link #withBigtableService} or creates and returns an implementation that talks to
      * {@code Cloud Bigtable}.
+     *
+     * <p>Also populate the credentials option from {@link GcpOptions#getGcpCredential()} if the
+     * default credentials are being used on {@link BigtableOptions}.
      */
-    private BigtableService getBigtableService() {
+    @VisibleForTesting
+    BigtableService getBigtableService(PipelineOptions pipelineOptions) {
       if (bigtableService != null) {
         return bigtableService;
       }
-      return new BigtableServiceImpl(options);
+      BigtableOptions.Builder clonedOptions = options.toBuilder();
+      if (options.getCredentialOptions().getCredentialType() == CredentialType.DefaultCredentials) {
+        clonedOptions.setCredentialOptions(
+            CredentialOptions.credential(
+                pipelineOptions.as(GcpOptions.class).getGcpCredential()));
+      }
+      return new BigtableServiceImpl(clonedOptions.build());
     }
   }
 
@@ -479,7 +501,13 @@ public class BigtableIO {
 
     @Override
     public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
-      input.apply(ParDo.of(new BigtableWriterFn(tableId, getBigtableService())));
+      input.apply(ParDo.of(new BigtableWriterFn(tableId,
+          new SerializableFunction<PipelineOptions, BigtableService>() {
+        @Override
+        public BigtableService apply(PipelineOptions options) {
+          return getBigtableService(options);
+        }
+      })));
       return PDone.in(input.getPipeline());
     }
 
@@ -489,7 +517,9 @@ public class BigtableIO {
       checkArgument(!tableId.isEmpty(), "Table ID not specified");
       try {
         checkArgument(
-            getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
+            getBigtableService(input.getPipeline().getOptions()).tableExists(tableId),
+            "Table %s does not exist",
+            tableId);
       } catch (IOException e) {
         logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
       }
@@ -533,29 +563,40 @@ public class BigtableIO {
      * Helper function that either returns the mock Bigtable service supplied by
      * {@link #withBigtableService} or creates and returns an implementation that talks to
      * {@code Cloud Bigtable}.
+     *
+     * <p>Also populate the credentials option from {@link GcpOptions#getGcpCredential()} if the
+     * default credentials are being used on {@link BigtableOptions}.
      */
-    private BigtableService getBigtableService() {
+    @VisibleForTesting
+    BigtableService getBigtableService(PipelineOptions pipelineOptions) {
       if (bigtableService != null) {
         return bigtableService;
       }
-      return new BigtableServiceImpl(options);
+      BigtableOptions.Builder clonedOptions = options.toBuilder();
+      if (options.getCredentialOptions().getCredentialType() == CredentialType.DefaultCredentials) {
+        clonedOptions.setCredentialOptions(
+            CredentialOptions.credential(
+                pipelineOptions.as(GcpOptions.class).getGcpCredential()));
+      }
+      return new BigtableServiceImpl(clonedOptions.build());
     }
 
     private class BigtableWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> {
 
-      public BigtableWriterFn(String tableId, BigtableService bigtableService) {
+      public BigtableWriterFn(String tableId,
+          SerializableFunction<PipelineOptions, BigtableService> bigtableServiceFactory) {
         this.tableId = checkNotNull(tableId, "tableId");
-        this.bigtableService = checkNotNull(bigtableService, "bigtableService");
+        this.bigtableServiceFactory =
+            checkNotNull(bigtableServiceFactory, "bigtableServiceFactory");
         this.failures = new ConcurrentLinkedQueue<>();
       }
 
-      @Setup
-      public void setup() throws Exception {
-        bigtableWriter = bigtableService.openForWriting(tableId);
-      }
-
       @StartBundle
-      public void startBundle(Context c) {
+      public void startBundle(Context c) throws IOException {
+        if (bigtableWriter == null) {
+          bigtableWriter = bigtableServiceFactory.apply(
+              c.getPipelineOptions()).openForWriting(tableId);
+        }
         recordsWritten = 0;
       }
 
@@ -587,7 +628,7 @@ public class BigtableIO {
 
       ///////////////////////////////////////////////////////////////////////////////
       private final String tableId;
-      private final BigtableService bigtableService;
+      private final SerializableFunction<PipelineOptions, BigtableService> bigtableServiceFactory;
       private BigtableService.Writer bigtableWriter;
       private long recordsWritten;
       private final ConcurrentLinkedQueue<BigtableWriteException> failures;
@@ -645,12 +686,12 @@ public class BigtableIO {
 
   static class BigtableSource extends BoundedSource<Row> {
     public BigtableSource(
-        BigtableService service,
+        SerializableFunction<PipelineOptions, BigtableService> serviceFactory,
         String tableId,
         @Nullable RowFilter filter,
         ByteKeyRange range,
         @Nullable Long estimatedSizeBytes) {
-      this.service = service;
+      this.serviceFactory = serviceFactory;
       this.tableId = tableId;
       this.filter = filter;
       this.range = range;
@@ -668,7 +709,7 @@ public class BigtableIO {
     }
 
     ////// Private state and internal implementation details //////
-    private final BigtableService service;
+    private final SerializableFunction<PipelineOptions, BigtableService> serviceFactory;
     private final String tableId;
     @Nullable private final RowFilter filter;
     private final ByteKeyRange range;
@@ -678,18 +719,18 @@ public class BigtableIO {
     protected BigtableSource withStartKey(ByteKey startKey) {
       checkNotNull(startKey, "startKey");
       return new BigtableSource(
-          service, tableId, filter, range.withStartKey(startKey), estimatedSizeBytes);
+          serviceFactory, tableId, filter, range.withStartKey(startKey), estimatedSizeBytes);
     }
 
     protected BigtableSource withEndKey(ByteKey endKey) {
       checkNotNull(endKey, "endKey");
       return new BigtableSource(
-          service, tableId, filter, range.withEndKey(endKey), estimatedSizeBytes);
+          serviceFactory, tableId, filter, range.withEndKey(endKey), estimatedSizeBytes);
     }
 
     protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {
       checkNotNull(estimatedSizeBytes, "estimatedSizeBytes");
-      return new BigtableSource(service, tableId, filter, range, estimatedSizeBytes);
+      return new BigtableSource(serviceFactory, tableId, filter, range, estimatedSizeBytes);
     }
 
     /**
@@ -697,8 +738,9 @@ public class BigtableIO {
      * boundaries and estimated sizes. We can use these samples to ensure that splits are on
      * different tablets, and possibly generate sub-splits within tablets.
      */
-    private List<SampleRowKeysResponse> getSampleRowKeys() throws IOException {
-      return service.getSampleRowKeys(this);
+    private List<SampleRowKeysResponse> getSampleRowKeys(PipelineOptions pipelineOptions)
+        throws IOException {
+      return serviceFactory.apply(pipelineOptions).getSampleRowKeys(this);
     }
 
     @Override
@@ -712,7 +754,7 @@ public class BigtableIO {
           Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes);
 
       // Delegate to testable helper.
-      return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys());
+      return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys(options));
     }
 
     /** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */
@@ -798,7 +840,7 @@ public class BigtableIO {
     public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
       // Delegate to testable helper.
       if (estimatedSizeBytes == null) {
-        estimatedSizeBytes = getEstimatedSizeBytesBasedOnSamples(getSampleRowKeys());
+        estimatedSizeBytes = getEstimatedSizeBytesBasedOnSamples(getSampleRowKeys(options));
       }
       return estimatedSizeBytes;
     }
@@ -840,7 +882,7 @@ public class BigtableIO {
 
     @Override
     public BoundedReader<Row> createReader(PipelineOptions options) throws IOException {
-      return new BigtableReader(this, service);
+      return new BigtableReader(this, serviceFactory.apply(options));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ebb7a1e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
index c656bbb..ecb7b32 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -21,6 +21,7 @@ import com.google.bigtable.v2.MutateRowResponse;
 import com.google.bigtable.v2.Mutation;
 import com.google.bigtable.v2.Row;
 import com.google.bigtable.v2.SampleRowKeysResponse;
+import com.google.cloud.bigtable.config.BigtableOptions;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
@@ -94,6 +95,11 @@ interface BigtableService extends Serializable {
   }
 
   /**
+   * Returns the BigtableOptions used to configure this BigtableService.
+   */
+  BigtableOptions getBigtableOptions();
+
+  /**
    * Returns {@code true} if the table with the give name exists.
    */
   boolean tableExists(String tableId) throws IOException;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ebb7a1e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index a402643..7ce4b4a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -61,6 +61,11 @@ class BigtableServiceImpl implements BigtableService {
   private final BigtableOptions options;
 
   @Override
+  public BigtableOptions getBigtableOptions() {
+    return options;
+  }
+
+  @Override
   public BigtableWriterImpl openForWriting(String tableId) throws IOException {
     BigtableSession session = new BigtableSession(options);
     BigtableTableName tableName = options.getInstanceName().toTableName(tableId);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ebb7a1e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 3ca2b64..98215df 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -49,6 +49,8 @@ import com.google.bigtable.v2.RowFilter;
 import com.google.bigtable.v2.SampleRowKeysResponse;
 import com.google.cloud.bigtable.config.BigtableOptions;
 import com.google.cloud.bigtable.config.BulkOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.config.CredentialOptions.CredentialType;
 import com.google.cloud.bigtable.config.RetryOptions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
@@ -77,12 +79,17 @@ import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
 import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.util.TestCredential;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -108,6 +115,13 @@ public class BigtableIOTest {
    * otherwise.
    */
   private static FakeBigtableService service;
+  private static SerializableFunction<PipelineOptions, BigtableService> serviceFactory =
+      new SerializableFunction<PipelineOptions, BigtableService>() {
+        @Override
+        public BigtableService apply(PipelineOptions input) {
+          return service;
+        }
+  };
   private static final BigtableOptions BIGTABLE_OPTIONS =
       new BigtableOptions.Builder()
           .setProjectId("project")
@@ -200,6 +214,50 @@ public class BigtableIOTest {
     return KV.of(ByteString.copyFromUtf8(key), mutations);
   }
 
+  /** Tests that credentials are used from PipelineOptions if not supplied by BigtableOptions. */
+  @Test
+  public void testUsePipelineOptionsCredentialsIfNotSpecifiedInBigtableOptions() throws Exception {
+    BigtableOptions options = BIGTABLE_OPTIONS.toBuilder()
+        .setCredentialOptions(CredentialOptions.defaultCredentials())
+        .build();
+    GcpOptions pipelineOptions = PipelineOptionsFactory.as(GcpOptions.class);
+    pipelineOptions.setGcpCredential(new TestCredential());
+    BigtableService readService = BigtableIO.read()
+        .withBigtableOptions(options)
+        .withTableId("TEST-TABLE")
+        .getBigtableService(pipelineOptions);
+    BigtableService writeService = BigtableIO.write()
+        .withBigtableOptions(options)
+        .withTableId("TEST-TABLE")
+        .getBigtableService(pipelineOptions);
+    assertEquals(CredentialType.SuppliedCredentials,
+        readService.getBigtableOptions().getCredentialOptions().getCredentialType());
+    assertEquals(CredentialType.SuppliedCredentials,
+        writeService.getBigtableOptions().getCredentialOptions().getCredentialType());
+  }
+
+  /** Tests that credentials are not used from PipelineOptions if supplied by BigtableOptions. */
+  @Test
+  public void testDontUsePipelineOptionsCredentialsIfSpecifiedInBigtableOptions() throws Exception {
+    BigtableOptions options = BIGTABLE_OPTIONS.toBuilder()
+        .setCredentialOptions(CredentialOptions.nullCredential())
+        .build();
+    GcpOptions pipelineOptions = PipelineOptionsFactory.as(GcpOptions.class);
+    pipelineOptions.setGcpCredential(new TestCredential());
+    BigtableService readService = BigtableIO.read()
+        .withBigtableOptions(options)
+        .withTableId("TEST-TABLE")
+        .getBigtableService(pipelineOptions);
+    BigtableService writeService = BigtableIO.write()
+        .withBigtableOptions(options)
+        .withTableId("TEST-TABLE")
+        .getBigtableService(pipelineOptions);
+    assertEquals(CredentialType.None,
+        readService.getBigtableOptions().getCredentialOptions().getCredentialType());
+    assertEquals(CredentialType.None,
+        writeService.getBigtableOptions().getCredentialOptions().getCredentialType());
+  }
+
   /** Tests that when reading from a non-existent table, the read fails. */
   @Test
   public void testReadingFailsTableDoesNotExist() throws Exception {
@@ -359,7 +417,7 @@ public class BigtableIOTest {
     service.setupSampleRowKeys(table, numSamples, bytesPerRow);
 
     BigtableSource source =
-        new BigtableSource(service, table, null, service.getTableRange(table), null);
+        new BigtableSource(serviceFactory, table, null, service.getTableRange(table), null);
     assertSplitAtFractionExhaustive(source, null);
   }
 
@@ -376,7 +434,7 @@ public class BigtableIOTest {
     service.setupSampleRowKeys(table, numSamples, bytesPerRow);
 
     BigtableSource source =
-        new BigtableSource(service, table, null, service.getTableRange(table), null);
+        new BigtableSource(serviceFactory, table, null, service.getTableRange(table), null);
     // With 0 items read, all split requests will fail.
     assertSplitAtFractionFails(source, 0, 0.1, null /* options */);
     assertSplitAtFractionFails(source, 0, 1.0, null /* options */);
@@ -406,7 +464,11 @@ public class BigtableIOTest {
 
     // Generate source and split it.
     BigtableSource source =
-        new BigtableSource(service, table, null /*filter*/, ByteKeyRange.ALL_KEYS, null /*size*/);
+        new BigtableSource(serviceFactory,
+            table,
+            null /*filter*/,
+            ByteKeyRange.ALL_KEYS,
+            null /*size*/);
     List<BigtableSource> splits =
         source.splitIntoBundles(numRows * bytesPerRow / numSamples, null /* options */);
 
@@ -430,7 +492,11 @@ public class BigtableIOTest {
 
     // Generate source and split it.
     BigtableSource source =
-        new BigtableSource(service, table, null /*filter*/, ByteKeyRange.ALL_KEYS, null /*size*/);
+        new BigtableSource(serviceFactory,
+        table,
+        null /*filter*/,
+        ByteKeyRange.ALL_KEYS,
+        null /*size*/);
     List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
 
     // Test num splits and split equality.
@@ -455,7 +521,7 @@ public class BigtableIOTest {
     RowFilter filter =
         RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build();
     BigtableSource source =
-        new BigtableSource(service, table, filter, ByteKeyRange.ALL_KEYS, null /*size*/);
+        new BigtableSource(serviceFactory, table, filter, ByteKeyRange.ALL_KEYS, null /*size*/);
     List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
 
     // Test num splits and split equality.
@@ -589,7 +655,7 @@ public class BigtableIOTest {
     makeTableData(table, numRows);
 
     BigtableSource source =
-        new BigtableSource(service, table, null, ByteKeyRange.ALL_KEYS, null);
+        new BigtableSource(serviceFactory, table, null, ByteKeyRange.ALL_KEYS, null);
 
     BoundedReader<Row> reader = source.createReader(TestPipeline.testingPipelineOptions());
 
@@ -711,6 +777,11 @@ public class BigtableIOTest {
     private final Map<String, SortedMap<ByteString, ByteString>> tables = new HashMap<>();
     private final Map<String, List<SampleRowKeysResponse>> sampleRowKeys = new HashMap<>();
 
+    @Override
+    public BigtableOptions getBigtableOptions() {
+      return null;
+    }
+
     @Nullable
     public SortedMap<ByteString, ByteString> getTable(String tableId) {
       return tables.get(tableId);