You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/10 18:34:03 UTC
[1/2] incubator-beam git commit: This closes #1308
Repository: incubator-beam
Updated Branches:
refs/heads/master 8d403a289 -> ab06647f9
This closes #1308
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ab06647f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ab06647f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ab06647f
Branch: refs/heads/master
Commit: ab06647f9d9c4fb72d2571dc2225b71b9fbdffbd
Parents: 8d403a2 0ebb7a1
Author: Thomas Groh <tg...@google.com>
Authored: Thu Nov 10 10:27:01 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 10 10:27:01 2016 -0800
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 98 ++++++++++++++------
.../sdk/io/gcp/bigtable/BigtableService.java | 6 ++
.../io/gcp/bigtable/BigtableServiceImpl.java | 5 +
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 83 +++++++++++++++--
4 files changed, 158 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: Use Credentials from GcpOptions
instead of BigtableOptions
Posted by tg...@apache.org.
Use Credentials from GcpOptions instead of BigtableOptions
Fixes [BEAM-939]
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0ebb7a1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0ebb7a1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0ebb7a1e
Branch: refs/heads/master
Commit: 0ebb7a1edb98c035b54d382a2e2d77331dac829a
Parents: 8d403a2
Author: Luke Cwik <lc...@google.com>
Authored: Mon Nov 7 19:37:42 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 10 10:27:01 2016 -0800
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 98 ++++++++++++++------
.../sdk/io/gcp/bigtable/BigtableService.java | 6 ++
.../io/gcp/bigtable/BigtableServiceImpl.java | 5 +
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 83 +++++++++++++++--
4 files changed, 158 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ebb7a1e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 90b9584..1ee9253 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -27,7 +27,10 @@ import com.google.bigtable.v2.Row;
import com.google.bigtable.v2.RowFilter;
import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.config.CredentialOptions.CredentialType;
import com.google.cloud.bigtable.config.RetryOptions;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
@@ -49,11 +52,13 @@ import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
+import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.values.KV;
@@ -283,7 +288,12 @@ public class BigtableIO {
@Override
public PCollection<Row> apply(PBegin input) {
BigtableSource source =
- new BigtableSource(getBigtableService(), tableId, filter, keyRange, null);
+ new BigtableSource(new SerializableFunction<PipelineOptions, BigtableService>() {
+ @Override
+ public BigtableService apply(PipelineOptions options) {
+ return getBigtableService(options);
+ }
+ }, tableId, filter, keyRange, null);
return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
}
@@ -293,7 +303,9 @@ public class BigtableIO {
checkArgument(!tableId.isEmpty(), "Table ID not specified");
try {
checkArgument(
- getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
+ getBigtableService(input.getPipeline().getOptions()).tableExists(tableId),
+ "Table %s does not exist",
+ tableId);
} catch (IOException e) {
logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
}
@@ -372,12 +384,22 @@ public class BigtableIO {
* Helper function that either returns the mock Bigtable service supplied by
* {@link #withBigtableService} or creates and returns an implementation that talks to
* {@code Cloud Bigtable}.
+ *
+ * <p>Also populate the credentials option from {@link GcpOptions#getGcpCredential()} if the
+ * default credentials are being used on {@link BigtableOptions}.
*/
- private BigtableService getBigtableService() {
+ @VisibleForTesting
+ BigtableService getBigtableService(PipelineOptions pipelineOptions) {
if (bigtableService != null) {
return bigtableService;
}
- return new BigtableServiceImpl(options);
+ BigtableOptions.Builder clonedOptions = options.toBuilder();
+ if (options.getCredentialOptions().getCredentialType() == CredentialType.DefaultCredentials) {
+ clonedOptions.setCredentialOptions(
+ CredentialOptions.credential(
+ pipelineOptions.as(GcpOptions.class).getGcpCredential()));
+ }
+ return new BigtableServiceImpl(clonedOptions.build());
}
}
@@ -479,7 +501,13 @@ public class BigtableIO {
@Override
public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
- input.apply(ParDo.of(new BigtableWriterFn(tableId, getBigtableService())));
+ input.apply(ParDo.of(new BigtableWriterFn(tableId,
+ new SerializableFunction<PipelineOptions, BigtableService>() {
+ @Override
+ public BigtableService apply(PipelineOptions options) {
+ return getBigtableService(options);
+ }
+ })));
return PDone.in(input.getPipeline());
}
@@ -489,7 +517,9 @@ public class BigtableIO {
checkArgument(!tableId.isEmpty(), "Table ID not specified");
try {
checkArgument(
- getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
+ getBigtableService(input.getPipeline().getOptions()).tableExists(tableId),
+ "Table %s does not exist",
+ tableId);
} catch (IOException e) {
logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
}
@@ -533,29 +563,40 @@ public class BigtableIO {
* Helper function that either returns the mock Bigtable service supplied by
* {@link #withBigtableService} or creates and returns an implementation that talks to
* {@code Cloud Bigtable}.
+ *
+ * <p>Also populate the credentials option from {@link GcpOptions#getGcpCredential()} if the
+ * default credentials are being used on {@link BigtableOptions}.
*/
- private BigtableService getBigtableService() {
+ @VisibleForTesting
+ BigtableService getBigtableService(PipelineOptions pipelineOptions) {
if (bigtableService != null) {
return bigtableService;
}
- return new BigtableServiceImpl(options);
+ BigtableOptions.Builder clonedOptions = options.toBuilder();
+ if (options.getCredentialOptions().getCredentialType() == CredentialType.DefaultCredentials) {
+ clonedOptions.setCredentialOptions(
+ CredentialOptions.credential(
+ pipelineOptions.as(GcpOptions.class).getGcpCredential()));
+ }
+ return new BigtableServiceImpl(clonedOptions.build());
}
private class BigtableWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> {
- public BigtableWriterFn(String tableId, BigtableService bigtableService) {
+ public BigtableWriterFn(String tableId,
+ SerializableFunction<PipelineOptions, BigtableService> bigtableServiceFactory) {
this.tableId = checkNotNull(tableId, "tableId");
- this.bigtableService = checkNotNull(bigtableService, "bigtableService");
+ this.bigtableServiceFactory =
+ checkNotNull(bigtableServiceFactory, "bigtableServiceFactory");
this.failures = new ConcurrentLinkedQueue<>();
}
- @Setup
- public void setup() throws Exception {
- bigtableWriter = bigtableService.openForWriting(tableId);
- }
-
@StartBundle
- public void startBundle(Context c) {
+ public void startBundle(Context c) throws IOException {
+ if (bigtableWriter == null) {
+ bigtableWriter = bigtableServiceFactory.apply(
+ c.getPipelineOptions()).openForWriting(tableId);
+ }
recordsWritten = 0;
}
@@ -587,7 +628,7 @@ public class BigtableIO {
///////////////////////////////////////////////////////////////////////////////
private final String tableId;
- private final BigtableService bigtableService;
+ private final SerializableFunction<PipelineOptions, BigtableService> bigtableServiceFactory;
private BigtableService.Writer bigtableWriter;
private long recordsWritten;
private final ConcurrentLinkedQueue<BigtableWriteException> failures;
@@ -645,12 +686,12 @@ public class BigtableIO {
static class BigtableSource extends BoundedSource<Row> {
public BigtableSource(
- BigtableService service,
+ SerializableFunction<PipelineOptions, BigtableService> serviceFactory,
String tableId,
@Nullable RowFilter filter,
ByteKeyRange range,
@Nullable Long estimatedSizeBytes) {
- this.service = service;
+ this.serviceFactory = serviceFactory;
this.tableId = tableId;
this.filter = filter;
this.range = range;
@@ -668,7 +709,7 @@ public class BigtableIO {
}
////// Private state and internal implementation details //////
- private final BigtableService service;
+ private final SerializableFunction<PipelineOptions, BigtableService> serviceFactory;
private final String tableId;
@Nullable private final RowFilter filter;
private final ByteKeyRange range;
@@ -678,18 +719,18 @@ public class BigtableIO {
protected BigtableSource withStartKey(ByteKey startKey) {
checkNotNull(startKey, "startKey");
return new BigtableSource(
- service, tableId, filter, range.withStartKey(startKey), estimatedSizeBytes);
+ serviceFactory, tableId, filter, range.withStartKey(startKey), estimatedSizeBytes);
}
protected BigtableSource withEndKey(ByteKey endKey) {
checkNotNull(endKey, "endKey");
return new BigtableSource(
- service, tableId, filter, range.withEndKey(endKey), estimatedSizeBytes);
+ serviceFactory, tableId, filter, range.withEndKey(endKey), estimatedSizeBytes);
}
protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {
checkNotNull(estimatedSizeBytes, "estimatedSizeBytes");
- return new BigtableSource(service, tableId, filter, range, estimatedSizeBytes);
+ return new BigtableSource(serviceFactory, tableId, filter, range, estimatedSizeBytes);
}
/**
@@ -697,8 +738,9 @@ public class BigtableIO {
* boundaries and estimated sizes. We can use these samples to ensure that splits are on
* different tablets, and possibly generate sub-splits within tablets.
*/
- private List<SampleRowKeysResponse> getSampleRowKeys() throws IOException {
- return service.getSampleRowKeys(this);
+ private List<SampleRowKeysResponse> getSampleRowKeys(PipelineOptions pipelineOptions)
+ throws IOException {
+ return serviceFactory.apply(pipelineOptions).getSampleRowKeys(this);
}
@Override
@@ -712,7 +754,7 @@ public class BigtableIO {
Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes);
// Delegate to testable helper.
- return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys());
+ return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys(options));
}
/** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */
@@ -798,7 +840,7 @@ public class BigtableIO {
public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
// Delegate to testable helper.
if (estimatedSizeBytes == null) {
- estimatedSizeBytes = getEstimatedSizeBytesBasedOnSamples(getSampleRowKeys());
+ estimatedSizeBytes = getEstimatedSizeBytesBasedOnSamples(getSampleRowKeys(options));
}
return estimatedSizeBytes;
}
@@ -840,7 +882,7 @@ public class BigtableIO {
@Override
public BoundedReader<Row> createReader(PipelineOptions options) throws IOException {
- return new BigtableReader(this, service);
+ return new BigtableReader(this, serviceFactory.apply(options));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ebb7a1e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
index c656bbb..ecb7b32 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -21,6 +21,7 @@ import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.Row;
import com.google.bigtable.v2.SampleRowKeysResponse;
+import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import java.io.IOException;
@@ -94,6 +95,11 @@ interface BigtableService extends Serializable {
}
/**
+ * Returns the BigtableOptions used to configure this BigtableService.
+ */
+ BigtableOptions getBigtableOptions();
+
+ /**
* Returns {@code true} if the table with the give name exists.
*/
boolean tableExists(String tableId) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ebb7a1e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index a402643..7ce4b4a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -61,6 +61,11 @@ class BigtableServiceImpl implements BigtableService {
private final BigtableOptions options;
@Override
+ public BigtableOptions getBigtableOptions() {
+ return options;
+ }
+
+ @Override
public BigtableWriterImpl openForWriting(String tableId) throws IOException {
BigtableSession session = new BigtableSession(options);
BigtableTableName tableName = options.getInstanceName().toTableName(tableId);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ebb7a1e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 3ca2b64..98215df 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -49,6 +49,8 @@ import com.google.bigtable.v2.RowFilter;
import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.config.BulkOptions;
+import com.google.cloud.bigtable.config.CredentialOptions;
+import com.google.cloud.bigtable.config.CredentialOptions.CredentialType;
import com.google.cloud.bigtable.config.RetryOptions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
@@ -77,12 +79,17 @@ import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -108,6 +115,13 @@ public class BigtableIOTest {
* otherwise.
*/
private static FakeBigtableService service;
+ private static SerializableFunction<PipelineOptions, BigtableService> serviceFactory =
+ new SerializableFunction<PipelineOptions, BigtableService>() {
+ @Override
+ public BigtableService apply(PipelineOptions input) {
+ return service;
+ }
+ };
private static final BigtableOptions BIGTABLE_OPTIONS =
new BigtableOptions.Builder()
.setProjectId("project")
@@ -200,6 +214,50 @@ public class BigtableIOTest {
return KV.of(ByteString.copyFromUtf8(key), mutations);
}
+ /** Tests that credentials are used from PipelineOptions if not supplied by BigtableOptions. */
+ @Test
+ public void testUsePipelineOptionsCredentialsIfNotSpecifiedInBigtableOptions() throws Exception {
+ BigtableOptions options = BIGTABLE_OPTIONS.toBuilder()
+ .setCredentialOptions(CredentialOptions.defaultCredentials())
+ .build();
+ GcpOptions pipelineOptions = PipelineOptionsFactory.as(GcpOptions.class);
+ pipelineOptions.setGcpCredential(new TestCredential());
+ BigtableService readService = BigtableIO.read()
+ .withBigtableOptions(options)
+ .withTableId("TEST-TABLE")
+ .getBigtableService(pipelineOptions);
+ BigtableService writeService = BigtableIO.write()
+ .withBigtableOptions(options)
+ .withTableId("TEST-TABLE")
+ .getBigtableService(pipelineOptions);
+ assertEquals(CredentialType.SuppliedCredentials,
+ readService.getBigtableOptions().getCredentialOptions().getCredentialType());
+ assertEquals(CredentialType.SuppliedCredentials,
+ writeService.getBigtableOptions().getCredentialOptions().getCredentialType());
+ }
+
+ /** Tests that credentials are not used from PipelineOptions if supplied by BigtableOptions. */
+ @Test
+ public void testDontUsePipelineOptionsCredentialsIfSpecifiedInBigtableOptions() throws Exception {
+ BigtableOptions options = BIGTABLE_OPTIONS.toBuilder()
+ .setCredentialOptions(CredentialOptions.nullCredential())
+ .build();
+ GcpOptions pipelineOptions = PipelineOptionsFactory.as(GcpOptions.class);
+ pipelineOptions.setGcpCredential(new TestCredential());
+ BigtableService readService = BigtableIO.read()
+ .withBigtableOptions(options)
+ .withTableId("TEST-TABLE")
+ .getBigtableService(pipelineOptions);
+ BigtableService writeService = BigtableIO.write()
+ .withBigtableOptions(options)
+ .withTableId("TEST-TABLE")
+ .getBigtableService(pipelineOptions);
+ assertEquals(CredentialType.None,
+ readService.getBigtableOptions().getCredentialOptions().getCredentialType());
+ assertEquals(CredentialType.None,
+ writeService.getBigtableOptions().getCredentialOptions().getCredentialType());
+ }
+
/** Tests that when reading from a non-existent table, the read fails. */
@Test
public void testReadingFailsTableDoesNotExist() throws Exception {
@@ -359,7 +417,7 @@ public class BigtableIOTest {
service.setupSampleRowKeys(table, numSamples, bytesPerRow);
BigtableSource source =
- new BigtableSource(service, table, null, service.getTableRange(table), null);
+ new BigtableSource(serviceFactory, table, null, service.getTableRange(table), null);
assertSplitAtFractionExhaustive(source, null);
}
@@ -376,7 +434,7 @@ public class BigtableIOTest {
service.setupSampleRowKeys(table, numSamples, bytesPerRow);
BigtableSource source =
- new BigtableSource(service, table, null, service.getTableRange(table), null);
+ new BigtableSource(serviceFactory, table, null, service.getTableRange(table), null);
// With 0 items read, all split requests will fail.
assertSplitAtFractionFails(source, 0, 0.1, null /* options */);
assertSplitAtFractionFails(source, 0, 1.0, null /* options */);
@@ -406,7 +464,11 @@ public class BigtableIOTest {
// Generate source and split it.
BigtableSource source =
- new BigtableSource(service, table, null /*filter*/, ByteKeyRange.ALL_KEYS, null /*size*/);
+ new BigtableSource(serviceFactory,
+ table,
+ null /*filter*/,
+ ByteKeyRange.ALL_KEYS,
+ null /*size*/);
List<BigtableSource> splits =
source.splitIntoBundles(numRows * bytesPerRow / numSamples, null /* options */);
@@ -430,7 +492,11 @@ public class BigtableIOTest {
// Generate source and split it.
BigtableSource source =
- new BigtableSource(service, table, null /*filter*/, ByteKeyRange.ALL_KEYS, null /*size*/);
+ new BigtableSource(serviceFactory,
+ table,
+ null /*filter*/,
+ ByteKeyRange.ALL_KEYS,
+ null /*size*/);
List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
// Test num splits and split equality.
@@ -455,7 +521,7 @@ public class BigtableIOTest {
RowFilter filter =
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build();
BigtableSource source =
- new BigtableSource(service, table, filter, ByteKeyRange.ALL_KEYS, null /*size*/);
+ new BigtableSource(serviceFactory, table, filter, ByteKeyRange.ALL_KEYS, null /*size*/);
List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
// Test num splits and split equality.
@@ -589,7 +655,7 @@ public class BigtableIOTest {
makeTableData(table, numRows);
BigtableSource source =
- new BigtableSource(service, table, null, ByteKeyRange.ALL_KEYS, null);
+ new BigtableSource(serviceFactory, table, null, ByteKeyRange.ALL_KEYS, null);
BoundedReader<Row> reader = source.createReader(TestPipeline.testingPipelineOptions());
@@ -711,6 +777,11 @@ public class BigtableIOTest {
private final Map<String, SortedMap<ByteString, ByteString>> tables = new HashMap<>();
private final Map<String, List<SampleRowKeysResponse>> sampleRowKeys = new HashMap<>();
+ @Override
+ public BigtableOptions getBigtableOptions() {
+ return null;
+ }
+
@Nullable
public SortedMap<ByteString, ByteString> getTable(String tableId) {
return tables.get(tableId);