You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/26 23:23:11 UTC
[1/3] incubator-beam git commit: [BEAM-77] Move bigtable in IO
Repository: incubator-beam
Updated Branches:
refs/heads/master 9746f0d1d -> aa43ec0b0
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
new file mode 100644
index 0000000..d1d5cd6
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>io-parent</artifactId>
+ <version>0.1.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>google-cloud-platform</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: IO :: Google Cloud Platform</name>
+ <description>IO library to read and write Google Cloud Platform systems from Beam.</description>
+ <packaging>jar</packaging>
+
+ <properties>
+ <bigtable.version>0.2.3</bigtable.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>java-sdk-all</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-all</artifactId>
+ <version>0.12.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.cloud.bigtable</groupId>
+ <artifactId>bigtable-protos</artifactId>
+ <version>${bigtable.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.cloud.bigtable</groupId>
+ <artifactId>bigtable-client-core</artifactId>
+ <version>${bigtable.version}</version>
+ </dependency>
+
+ <!-- test -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>java-sdk-all</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <version>1.7.14</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
new file mode 100644
index 0000000..f99eb1d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -0,0 +1,1016 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.Sink.WriteOperation;
+import org.apache.beam.sdk.io.Sink.Writer;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.ReleaseInfo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+import com.google.bigtable.v1.Mutation;
+import com.google.bigtable.v1.Row;
+import com.google.bigtable.v1.RowFilter;
+import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.annotation.Nullable;
+
+/**
+ * A bounded source and sink for Google Cloud Bigtable.
+ *
+ * <p>For more information, see the online documentation at
+ * <a href="https://cloud.google.com/bigtable/">Google Cloud Bigtable</a>.
+ *
+ * <h3>Reading from Cloud Bigtable</h3>
+ *
+ * <p>The Bigtable source returns a set of rows from a single table, returning a
+ * {@code PCollection<Row>}.
+ *
+ * <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions}
+ * or builder configured with the project and other information necessary to identify the
+ * Bigtable cluster. A {@link RowFilter} may also optionally be specified using
+ * {@link BigtableIO.Read#withRowFilter}. For example:
+ *
+ * <pre>{@code
+ * BigtableOptions.Builder optionsBuilder =
+ * new BigtableOptions.Builder()
+ * .setProjectId("project")
+ * .setClusterId("cluster")
+ * .setZoneId("zone");
+ *
+ * Pipeline p = ...;
+ *
+ * // Scan the entire table.
+ * p.apply("read",
+ * BigtableIO.read()
+ * .withBigtableOptions(optionsBuilder)
+ * .withTableId("table"));
+ *
+ * // Scan a subset of rows that match the specified row filter.
+ * p.apply("filtered read",
+ * BigtableIO.read()
+ * .withBigtableOptions(optionsBuilder)
+ * .withTableId("table")
+ * .withRowFilter(filter));
+ * }</pre>
+ *
+ * <h3>Writing to Cloud Bigtable</h3>
+ *
+ * <p>The Bigtable sink executes a set of row mutations on a single table. It takes as input a
+ * {@link PCollection PCollection<KV<ByteString, Iterable<Mutation>>>}, where the
+ * {@link ByteString} is the key of the row being mutated, and each {@link Mutation} represents an
+ * idempotent transformation to that row.
+ *
+ * <p>To configure a Cloud Bigtable sink, you must supply a table id and a {@link BigtableOptions}
+ * or builder configured with the project and other information necessary to identify the
+ * Bigtable cluster, for example:
+ *
+ * <pre>{@code
+ * BigtableOptions.Builder optionsBuilder =
+ * new BigtableOptions.Builder()
+ * .setProjectId("project")
+ * .setClusterId("cluster")
+ * .setZoneId("zone");
+ *
+ * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
+ *
+ * data.apply("write",
+ * BigtableIO.write()
+ * .withBigtableOptions(optionsBuilder)
+ * .withTableId("table"));
+ * }</pre>
+ *
+ * <h3>Experimental</h3>
+ *
+ * <p>This connector for Cloud Bigtable is considered experimental and may break or receive
+ * backwards-incompatible changes in future versions of the Cloud Dataflow SDK. Cloud Bigtable is
+ * in Beta, and thus it may introduce breaking changes in future revisions of its service or APIs.
+ *
+ * <h3>Permissions</h3>
+ *
+ * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
+ * Dataflow job. Please refer to the documentation of corresponding
+ * {@link PipelineRunner PipelineRunners} for more details.
+ */
+@Experimental
+public class BigtableIO {
+ private static final Logger logger = LoggerFactory.getLogger(BigtableIO.class);
+
+ /**
+ * Creates an uninitialized {@link BigtableIO.Read}. Before use, the {@code Read} must be
+ * initialized with a
+ * {@link BigtableIO.Read#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
+ * the source Cloud Bigtable cluster, and a {@link BigtableIO.Read#withTableId tableId} that
+ * specifies which table to read. A {@link RowFilter} may also optionally be specified using
+ * {@link BigtableIO.Read#withRowFilter}.
+ */
+ @Experimental
+ public static Read read() {
+ return new Read(null, "", null, null);
+ }
+
+ /**
+ * Creates an uninitialized {@link BigtableIO.Write}. Before use, the {@code Write} must be
+ * initialized with a
+ * {@link BigtableIO.Write#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
+ * the destination Cloud Bigtable cluster, and a {@link BigtableIO.Write#withTableId tableId} that
+ * specifies which table to write.
+ */
+ @Experimental
+ public static Write write() {
+ return new Write(null, "", null);
+ }
+
+ /**
+ * A {@link PTransform} that reads from Google Cloud Bigtable. See the class-level Javadoc on
+ * {@link BigtableIO} for more information.
+ *
+ * @see BigtableIO
+ */
+ @Experimental
+ public static class Read extends PTransform<PBegin, PCollection<Row>> {
+ /**
+ * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
+ * indicated by the given options, and using any other specified customizations.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withBigtableOptions(BigtableOptions options) {
+ checkNotNull(options, "options");
+ return withBigtableOptions(options.toBuilder());
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
+ * indicated by the given options, and using any other specified customizations.
+ *
+ * <p>Clones the given {@link BigtableOptions} builder so that any further changes
+ * will have no effect on the returned {@link BigtableIO.Read}.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
+ checkNotNull(optionsBuilder, "optionsBuilder");
+ // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
+ BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
+ BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
+ return new Read(optionsWithAgent, tableId, filter, bigtableService);
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
+ * using the given row filter.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withRowFilter(RowFilter filter) {
+ checkNotNull(filter, "filter");
+ return new Read(options, tableId, filter, bigtableService);
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Read} that will read from the specified table.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withTableId(String tableId) {
+ checkNotNull(tableId, "tableId");
+ return new Read(options, tableId, filter, bigtableService);
+ }
+
+ /**
+ * Returns the Google Cloud Bigtable cluster being read from, and other parameters.
+ */
+ public BigtableOptions getBigtableOptions() {
+ return options;
+ }
+
+ /**
+ * Returns the table being read from.
+ */
+ public String getTableId() {
+ return tableId;
+ }
+
+ @Override
+ public PCollection<Row> apply(PBegin input) {
+ BigtableSource source =
+ new BigtableSource(getBigtableService(), tableId, filter, ByteKeyRange.ALL_KEYS, null);
+ return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
+ }
+
+ @Override
+ public void validate(PBegin input) {
+ checkArgument(options != null, "BigtableOptions not specified");
+ checkArgument(!tableId.isEmpty(), "Table ID not specified");
+ try {
+ checkArgument(
+ getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
+ } catch (IOException e) {
+ logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+ }
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ builder.add("tableId", tableId);
+
+ if (options != null) {
+ builder.add("bigtableOptions", options.toString());
+ }
+
+ if (filter != null) {
+ builder.add("rowFilter", filter.toString());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(Read.class)
+ .add("options", options)
+ .add("tableId", tableId)
+ .add("filter", filter)
+ .toString();
+ }
+
+ /////////////////////////////////////////////////////////////////////////////////////////
+ /**
+ * Used to define the Cloud Bigtable cluster and any options for the networking layer.
+ * Cannot actually be {@code null} at validation time, but may start out {@code null} while
+ * source is being built.
+ */
+ @Nullable private final BigtableOptions options;
+ private final String tableId;
+ @Nullable private final RowFilter filter;
+ @Nullable private final BigtableService bigtableService;
+
+ private Read(
+ @Nullable BigtableOptions options,
+ String tableId,
+ @Nullable RowFilter filter,
+ @Nullable BigtableService bigtableService) {
+ this.options = options;
+ this.tableId = checkNotNull(tableId, "tableId");
+ this.filter = filter;
+ this.bigtableService = bigtableService;
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Read} that will read using the given Cloud Bigtable
+ * service implementation.
+ *
+ * <p>This is used for testing.
+ *
+ * <p>Does not modify this object.
+ */
+ Read withBigtableService(BigtableService bigtableService) {
+ checkNotNull(bigtableService, "bigtableService");
+ return new Read(options, tableId, filter, bigtableService);
+ }
+
+ /**
+ * Helper function that either returns the mock Bigtable service supplied by
+ * {@link #withBigtableService} or creates and returns an implementation that talks to
+ * {@code Cloud Bigtable}.
+ */
+ private BigtableService getBigtableService() {
+ if (bigtableService != null) {
+ return bigtableService;
+ }
+ return new BigtableServiceImpl(options);
+ }
+ }
+
+ /**
+ * A {@link PTransform} that writes to Google Cloud Bigtable. See the class-level Javadoc on
+ * {@link BigtableIO} for more information.
+ *
+ * @see BigtableIO
+ */
+ @Experimental
+ public static class Write
+ extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {
+ /**
+ * Used to define the Cloud Bigtable cluster and any options for the networking layer.
+ * Cannot actually be {@code null} at validation time, but may start out {@code null} while
+ * source is being built.
+ */
+ @Nullable private final BigtableOptions options;
+ private final String tableId;
+ @Nullable private final BigtableService bigtableService;
+
+ private Write(
+ @Nullable BigtableOptions options,
+ String tableId,
+ @Nullable BigtableService bigtableService) {
+ this.options = options;
+ this.tableId = checkNotNull(tableId, "tableId");
+ this.bigtableService = bigtableService;
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
+ * indicated by the given options, and using any other specified customizations.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withBigtableOptions(BigtableOptions options) {
+ checkNotNull(options, "options");
+ return withBigtableOptions(options.toBuilder());
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
+ * indicated by the given options, and using any other specified customizations.
+ *
+ * <p>Clones the given {@link BigtableOptions} builder so that any further changes
+ * will have no effect on the returned {@link BigtableIO.Write}.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
+ checkNotNull(optionsBuilder, "optionsBuilder");
+ // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
+ BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
+ BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
+ return new Write(optionsWithAgent, tableId, bigtableService);
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} that will write to the specified table.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withTableId(String tableId) {
+ checkNotNull(tableId, "tableId");
+ return new Write(options, tableId, bigtableService);
+ }
+
+ /**
+ * Returns the Google Cloud Bigtable cluster being written to, and other parameters.
+ */
+ public BigtableOptions getBigtableOptions() {
+ return options;
+ }
+
+ /**
+ * Returns the table being written to.
+ */
+ public String getTableId() {
+ return tableId;
+ }
+
+ @Override
+ public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
+ Sink sink = new Sink(tableId, getBigtableService());
+ return input.apply(org.apache.beam.sdk.io.Write.to(sink));
+ }
+
+ @Override
+ public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
+ checkArgument(options != null, "BigtableOptions not specified");
+ checkArgument(!tableId.isEmpty(), "Table ID not specified");
+ try {
+ checkArgument(
+ getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
+ } catch (IOException e) {
+ logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+ }
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Write} that will write using the given Cloud Bigtable
+ * service implementation.
+ *
+ * <p>This is used for testing.
+ *
+ * <p>Does not modify this object.
+ */
+ Write withBigtableService(BigtableService bigtableService) {
+ checkNotNull(bigtableService, "bigtableService");
+ return new Write(options, tableId, bigtableService);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ builder.add("tableId", tableId);
+
+ if (options != null) {
+ builder.add("bigtableOptions", options.toString());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(Write.class)
+ .add("options", options)
+ .add("tableId", tableId)
+ .toString();
+ }
+
+ /**
+ * Helper function that either returns the mock Bigtable service supplied by
+ * {@link #withBigtableService} or creates and returns an implementation that talks to
+ * {@code Cloud Bigtable}.
+ */
+ private BigtableService getBigtableService() {
+ if (bigtableService != null) {
+ return bigtableService;
+ }
+ return new BigtableServiceImpl(options);
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////////////////
+ /** Disallow construction of utility class. */
+ private BigtableIO() {}
+
+ static class BigtableSource extends BoundedSource<Row> {
+ public BigtableSource(
+ BigtableService service,
+ String tableId,
+ @Nullable RowFilter filter,
+ ByteKeyRange range,
+ Long estimatedSizeBytes) {
+ this.service = service;
+ this.tableId = tableId;
+ this.filter = filter;
+ this.range = range;
+ this.estimatedSizeBytes = estimatedSizeBytes;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(BigtableSource.class)
+ .add("tableId", tableId)
+ .add("filter", filter)
+ .add("range", range)
+ .add("estimatedSizeBytes", estimatedSizeBytes)
+ .toString();
+ }
+
+ ////// Private state and internal implementation details //////
+ private final BigtableService service;
+ @Nullable private final String tableId;
+ @Nullable private final RowFilter filter;
+ private final ByteKeyRange range;
+ @Nullable private Long estimatedSizeBytes;
+ @Nullable private transient List<SampleRowKeysResponse> sampleRowKeys;
+
+ protected BigtableSource withStartKey(ByteKey startKey) {
+ checkNotNull(startKey, "startKey");
+ return new BigtableSource(
+ service, tableId, filter, range.withStartKey(startKey), estimatedSizeBytes);
+ }
+
+ protected BigtableSource withEndKey(ByteKey endKey) {
+ checkNotNull(endKey, "endKey");
+ return new BigtableSource(
+ service, tableId, filter, range.withEndKey(endKey), estimatedSizeBytes);
+ }
+
+ protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {
+ checkNotNull(estimatedSizeBytes, "estimatedSizeBytes");
+ return new BigtableSource(service, tableId, filter, range, estimatedSizeBytes);
+ }
+
+ /**
+ * Makes an API call to the Cloud Bigtable service that gives information about tablet key
+ * boundaries and estimated sizes. We can use these samples to ensure that splits are on
+ * different tablets, and possibly generate sub-splits within tablets.
+ */
+ private List<SampleRowKeysResponse> getSampleRowKeys() throws IOException {
+ return service.getSampleRowKeys(this);
+ }
+
+ @Override
+ public List<BigtableSource> splitIntoBundles(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ // Update the desiredBundleSizeBytes in order to limit the
+ // number of splits to maximumNumberOfSplits.
+ long maximumNumberOfSplits = 4000;
+ long sizeEstimate = getEstimatedSizeBytes(options);
+ desiredBundleSizeBytes =
+ Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes);
+
+ // Delegate to testable helper.
+ return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys());
+ }
+
+ /** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */
+ private List<BigtableSource> splitIntoBundlesBasedOnSamples(
+ long desiredBundleSizeBytes, List<SampleRowKeysResponse> sampleRowKeys) {
+ // There are no regions, or no samples available. Just scan the entire range.
+ if (sampleRowKeys.isEmpty()) {
+ logger.info("Not splitting source {} because no sample row keys are available.", this);
+ return Collections.singletonList(this);
+ }
+
+ logger.info(
+ "About to split into bundles of size {} with sampleRowKeys length {} first element {}",
+ desiredBundleSizeBytes,
+ sampleRowKeys.size(),
+ sampleRowKeys.get(0));
+
+ // Loop through all sampled responses and generate splits from the ones that overlap the
+ // scan range. The main complication is that we must track the end range of the previous
+ // sample to generate good ranges.
+ ByteKey lastEndKey = ByteKey.EMPTY;
+ long lastOffset = 0;
+ ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
+ for (SampleRowKeysResponse response : sampleRowKeys) {
+ ByteKey responseEndKey = ByteKey.of(response.getRowKey());
+ long responseOffset = response.getOffsetBytes();
+ checkState(
+ responseOffset >= lastOffset,
+ "Expected response byte offset %s to come after the last offset %s",
+ responseOffset,
+ lastOffset);
+
+ if (!range.overlaps(ByteKeyRange.of(lastEndKey, responseEndKey))) {
+ // This region does not overlap the scan, so skip it.
+ lastOffset = responseOffset;
+ lastEndKey = responseEndKey;
+ continue;
+ }
+
+ // Calculate the beginning of the split as the larger of startKey and the end of the last
+ // split. Unspecified start is smallest key so is correctly treated as earliest key.
+ ByteKey splitStartKey = lastEndKey;
+ if (splitStartKey.compareTo(range.getStartKey()) < 0) {
+ splitStartKey = range.getStartKey();
+ }
+
+ // Calculate the end of the split as the smaller of endKey and the end of this sample. Note
+ // that range.containsKey handles the case when range.getEndKey() is empty.
+ ByteKey splitEndKey = responseEndKey;
+ if (!range.containsKey(splitEndKey)) {
+ splitEndKey = range.getEndKey();
+ }
+
+ // We know this region overlaps the desired key range, and we know a rough estimate of its
+ // size. Split the key range into bundle-sized chunks and then add them all as splits.
+ long sampleSizeBytes = responseOffset - lastOffset;
+ List<BigtableSource> subSplits =
+ splitKeyRangeIntoBundleSizedSubranges(
+ sampleSizeBytes,
+ desiredBundleSizeBytes,
+ ByteKeyRange.of(splitStartKey, splitEndKey));
+ splits.addAll(subSplits);
+
+ // Move to the next region.
+ lastEndKey = responseEndKey;
+ lastOffset = responseOffset;
+ }
+
+ // We must add one more region after the end of the samples if both these conditions hold:
+ // 1. we did not scan to the end yet (lastEndKey is concrete, not 0-length).
+ // 2. we want to scan to the end (endKey is empty) or farther (lastEndKey < endKey).
+ if (!lastEndKey.isEmpty()
+ && (range.getEndKey().isEmpty() || lastEndKey.compareTo(range.getEndKey()) < 0)) {
+ splits.add(this.withStartKey(lastEndKey).withEndKey(range.getEndKey()));
+ }
+
+ List<BigtableSource> ret = splits.build();
+ logger.info("Generated {} splits. First split: {}", ret.size(), ret.get(0));
+ return ret;
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
+ // Delegate to testable helper.
+ if (estimatedSizeBytes == null) {
+ estimatedSizeBytes = getEstimatedSizeBytesBasedOnSamples(getSampleRowKeys());
+ }
+ return estimatedSizeBytes;
+ }
+
+ /**
+ * Computes the estimated size in bytes based on the total size of all samples that overlap
+ * the key range this source will scan.
+ */
+ private long getEstimatedSizeBytesBasedOnSamples(List<SampleRowKeysResponse> samples) {
+ long estimatedSizeBytes = 0;
+ long lastOffset = 0;
+ ByteKey currentStartKey = ByteKey.EMPTY;
+ // Compute the total estimated size as the size of each sample that overlaps the scan range.
+ // TODO: In future, Bigtable service may provide finer grained APIs, e.g., to sample given a
+ // filter or to sample on a given key range.
+ for (SampleRowKeysResponse response : samples) {
+ ByteKey currentEndKey = ByteKey.of(response.getRowKey());
+ long currentOffset = response.getOffsetBytes();
+ if (!currentStartKey.isEmpty() && currentStartKey.equals(currentEndKey)) {
+ // Skip an empty region.
+ lastOffset = currentOffset;
+ continue;
+ } else if (range.overlaps(ByteKeyRange.of(currentStartKey, currentEndKey))) {
+ estimatedSizeBytes += currentOffset - lastOffset;
+ }
+ currentStartKey = currentEndKey;
+ lastOffset = currentOffset;
+ }
+ return estimatedSizeBytes;
+ }
+
+ /**
+ * Cloud Bigtable returns query results ordered by key.
+ */
+ @Override
+ public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+ return true;
+ }
+
+ @Override
+ public BoundedReader<Row> createReader(PipelineOptions options) throws IOException {
+ return new BigtableReader(this, service);
+ }
+
+ @Override
+ public void validate() {
+ checkArgument(!tableId.isEmpty(), "tableId cannot be empty");
+ }
+
+ @Override
+ public Coder<Row> getDefaultOutputCoder() {
+ return ProtoCoder.of(Row.class);
+ }
+
+ /** Helper that splits the specified range in this source into bundles. */
+ private List<BigtableSource> splitKeyRangeIntoBundleSizedSubranges(
+ long sampleSizeBytes, long desiredBundleSizeBytes, ByteKeyRange range) {
+ // Catch the trivial cases. Split is small enough already, or this is the last region.
+ logger.debug(
+ "Subsplit for sampleSizeBytes {} and desiredBundleSizeBytes {}",
+ sampleSizeBytes,
+ desiredBundleSizeBytes);
+ if (sampleSizeBytes <= desiredBundleSizeBytes) {
+ return Collections.singletonList(
+ this.withStartKey(range.getStartKey()).withEndKey(range.getEndKey()));
+ }
+
+ checkArgument(
+ sampleSizeBytes > 0, "Sample size %s bytes must be greater than 0.", sampleSizeBytes);
+ checkArgument(
+ desiredBundleSizeBytes > 0,
+ "Desired bundle size %s bytes must be greater than 0.",
+ desiredBundleSizeBytes);
+
+ int splitCount = (int) Math.ceil(((double) sampleSizeBytes) / (desiredBundleSizeBytes));
+ List<ByteKey> splitKeys = range.split(splitCount);
+ ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
+ Iterator<ByteKey> keys = splitKeys.iterator();
+ ByteKey prev = keys.next();
+ while (keys.hasNext()) {
+ ByteKey next = keys.next();
+ splits.add(
+ this
+ .withStartKey(prev)
+ .withEndKey(next)
+ .withEstimatedSizeBytes(sampleSizeBytes / splitCount));
+ prev = next;
+ }
+ return splits.build();
+ }
+
+ public ByteKeyRange getRange() {
+ return range;
+ }
+
+ public RowFilter getRowFilter() {
+ return filter;
+ }
+
+ public String getTableId() {
+ return tableId;
+ }
+ }
+
+ private static class BigtableReader extends BoundedReader<Row> {
+ // Thread-safety: source is protected via synchronization and is only accessed or modified
+ // inside a synchronized block (or constructor, which is the same).
+ private BigtableSource source;
+ private BigtableService service;
+ private BigtableService.Reader reader;
+ private final ByteKeyRangeTracker rangeTracker;
+ private long recordsReturned;
+
+ public BigtableReader(BigtableSource source, BigtableService service) {
+ this.source = source;
+ this.service = service;
+ rangeTracker = ByteKeyRangeTracker.of(source.getRange());
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ reader = service.createReader(getCurrentSource());
+ boolean hasRecord =
+ reader.start()
+ && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
+ if (hasRecord) {
+ ++recordsReturned;
+ }
+ return hasRecord;
+ }
+
+ @Override
+ public synchronized BigtableSource getCurrentSource() {
+ return source;
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ boolean hasRecord =
+ reader.advance()
+ && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
+ if (hasRecord) {
+ ++recordsReturned;
+ }
+ return hasRecord;
+ }
+
+ @Override
+ public Row getCurrent() throws NoSuchElementException {
+ return reader.getCurrentRow();
+ }
+
+ @Override
+ public void close() throws IOException {
+ logger.info("Closing reader after reading {} records.", recordsReturned);
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ }
+
+ @Override
+ public final Double getFractionConsumed() {
+ return rangeTracker.getFractionConsumed();
+ }
+
+ @Override
+ public final synchronized BigtableSource splitAtFraction(double fraction) {
+ ByteKey splitKey;
+ try {
+ splitKey = source.getRange().interpolateKey(fraction);
+ } catch (IllegalArgumentException e) {
+ logger.info("%s: Failed to interpolate key for fraction %s.", source.getRange(), fraction);
+ return null;
+ }
+ logger.debug(
+ "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey);
+ if (!rangeTracker.trySplitAtPosition(splitKey)) {
+ return null;
+ }
+ BigtableSource primary = source.withEndKey(splitKey);
+ BigtableSource residual = source.withStartKey(splitKey);
+ this.source = primary;
+ return residual;
+ }
+ }
+
+ private static class Sink
+ extends org.apache.beam.sdk.io.Sink<KV<ByteString, Iterable<Mutation>>> {
+
+ public Sink(String tableId, BigtableService bigtableService) {
+ this.tableId = checkNotNull(tableId, "tableId");
+ this.bigtableService = checkNotNull(bigtableService, "bigtableService");
+ }
+
+ public String getTableId() {
+ return tableId;
+ }
+
+ public BigtableService getBigtableService() {
+ return bigtableService;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(Sink.class)
+ .add("bigtableService", bigtableService)
+ .add("tableId", tableId)
+ .toString();
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////
+ private final String tableId;
+ private final BigtableService bigtableService;
+
+ @Override
+ public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> createWriteOperation(
+ PipelineOptions options) {
+ return new BigtableWriteOperation(this);
+ }
+
+ /** Does nothing, as it is redundant with {@link Write#validate}. */
+ @Override
+ public void validate(PipelineOptions options) {}
+ }
+
+ private static class BigtableWriteOperation
+ extends WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> {
+ private final Sink sink;
+
+ public BigtableWriteOperation(Sink sink) {
+ this.sink = sink;
+ }
+
+ @Override
+ public Writer<KV<ByteString, Iterable<Mutation>>, Long> createWriter(PipelineOptions options)
+ throws Exception {
+ return new BigtableWriter(this);
+ }
+
+ @Override
+ public void initialize(PipelineOptions options) {}
+
+ @Override
+ public void finalize(Iterable<Long> writerResults, PipelineOptions options) {
+ long count = 0;
+ for (Long value : writerResults) {
+ value += count;
+ }
+ logger.debug("Wrote {} elements to BigtableIO.Sink {}", sink);
+ }
+
+ @Override
+ public Sink getSink() {
+ return sink;
+ }
+
+ @Override
+ public Coder<Long> getWriterResultCoder() {
+ return VarLongCoder.of();
+ }
+ }
+
+ private static class BigtableWriter extends Writer<KV<ByteString, Iterable<Mutation>>, Long> {
+ private final BigtableWriteOperation writeOperation;
+ private final Sink sink;
+ private BigtableService.Writer bigtableWriter;
+ private long recordsWritten;
+ private final ConcurrentLinkedQueue<BigtableWriteException> failures;
+
+ public BigtableWriter(BigtableWriteOperation writeOperation) {
+ this.writeOperation = writeOperation;
+ this.sink = writeOperation.getSink();
+ this.failures = new ConcurrentLinkedQueue<>();
+ }
+
+ @Override
+ public void open(String uId) throws Exception {
+ bigtableWriter = sink.getBigtableService().openForWriting(sink.getTableId());
+ recordsWritten = 0;
+ }
+
+ /**
+ * If any write has asynchronously failed, fail the bundle with a useful error.
+ */
+ private void checkForFailures() throws IOException {
+ // Note that this function is never called by multiple threads and is the only place that
+ // we remove from failures, so this code is safe.
+ if (failures.isEmpty()) {
+ return;
+ }
+
+ StringBuilder logEntry = new StringBuilder();
+ int i = 0;
+ for (; i < 10 && !failures.isEmpty(); ++i) {
+ BigtableWriteException exc = failures.remove();
+ logEntry.append("\n").append(exc.getMessage());
+ if (exc.getCause() != null) {
+ logEntry.append(": ").append(exc.getCause().getMessage());
+ }
+ }
+ String message =
+ String.format(
+ "At least %d errors occurred writing to Bigtable. First %d errors: %s",
+ i + failures.size(),
+ i,
+ logEntry.toString());
+ logger.error(message);
+ throw new IOException(message);
+ }
+
+ @Override
+ public void write(KV<ByteString, Iterable<Mutation>> rowMutations) throws Exception {
+ checkForFailures();
+ Futures.addCallback(
+ bigtableWriter.writeRecord(rowMutations), new WriteExceptionCallback(rowMutations));
+ ++recordsWritten;
+ }
+
+ @Override
+ public Long close() throws Exception {
+ bigtableWriter.close();
+ bigtableWriter = null;
+ checkForFailures();
+ logger.info("Wrote {} records", recordsWritten);
+ return recordsWritten;
+ }
+
+ @Override
+ public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> getWriteOperation() {
+ return writeOperation;
+ }
+
+ private class WriteExceptionCallback implements FutureCallback<Empty> {
+ private final KV<ByteString, Iterable<Mutation>> value;
+
+ public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) {
+ this.value = value;
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ failures.add(new BigtableWriteException(value, cause));
+ }
+
+ @Override
+ public void onSuccess(Empty produced) {}
+ }
+ }
+
+ /**
+ * An exception that puts information about the failed record being written in its message.
+ */
+ static class BigtableWriteException extends IOException {
+ public BigtableWriteException(KV<ByteString, Iterable<Mutation>> record, Throwable cause) {
+ super(
+ String.format(
+ "Error mutating row %s with mutations %s",
+ record.getKey().toStringUtf8(),
+ record.getValue()),
+ cause);
+ }
+ }
+
+ /**
+ * A helper function to produce a Cloud Bigtable user agent string.
+ */
+ private static String getUserAgent() {
+ String javaVersion = System.getProperty("java.specification.version");
+ ReleaseInfo info = ReleaseInfo.getReleaseInfo();
+ return String.format(
+ "%s/%s (%s); %s",
+ info.getName(),
+ info.getVersion(),
+ javaVersion,
+ "0.2.3" /* TODO get Bigtable client version directly from jar. */);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
new file mode 100644
index 0000000..93e558b
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.bigtable.v1.Mutation;
+import com.google.bigtable.v1.Row;
+import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * An interface for real or fake implementations of Cloud Bigtable.
+ */
+interface BigtableService extends Serializable {
+
+ /**
+ * The interface of a class that can write to Cloud Bigtable.
+ */
+ interface Writer {
+ /**
+ * Writes a single row transaction to Cloud Bigtable. The key of the {@code record} is the
+ * row key to be mutated and the iterable of mutations represent the changes to be made to the
+ * row.
+ *
+ * @throws IOException if there is an error submitting the write.
+ */
+ ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
+ throws IOException;
+
+ /**
+ * Closes the writer.
+ *
+ * @throws IOException if any writes did not succeed
+ */
+ void close() throws IOException;
+ }
+
+ /**
+ * The interface of a class that reads from Cloud Bigtable.
+ */
+ interface Reader {
+ /**
+ * Reads the first element (including initialization, such as opening a network connection) and
+ * returns true if an element was found.
+ */
+ boolean start() throws IOException;
+
+ /**
+ * Attempts to read the next element, and returns true if an element has been read.
+ */
+ boolean advance() throws IOException;
+
+ /**
+ * Closes the reader.
+ *
+ * @throws IOException if there is an error.
+ */
+ void close() throws IOException;
+
+ /**
+ * Returns the last row read by a successful start() or advance(), or throws if there is no
+ * current row because the last such call was unsuccessful.
+ */
+ Row getCurrentRow() throws NoSuchElementException;
+ }
+
+ /**
+ * Returns {@code true} if the table with the give name exists.
+ */
+ boolean tableExists(String tableId) throws IOException;
+
+ /**
+ * Returns a {@link Reader} that will read from the specified source.
+ */
+ Reader createReader(BigtableSource source) throws IOException;
+
+ /**
+ * Returns a {@link Writer} that will write to the specified table.
+ */
+ Writer openForWriting(String tableId) throws IOException;
+
+ /**
+ * Returns a set of row keys sampled from the underlying table. These contain information about
+ * the distribution of keys within the table.
+ */
+ List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
new file mode 100644
index 0000000..5933e13
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.bigtable.admin.table.v1.GetTableRequest;
+import com.google.bigtable.v1.MutateRowRequest;
+import com.google.bigtable.v1.Mutation;
+import com.google.bigtable.v1.ReadRowsRequest;
+import com.google.bigtable.v1.Row;
+import com.google.bigtable.v1.RowRange;
+import com.google.bigtable.v1.SampleRowKeysRequest;
+import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.grpc.BigtableSession;
+import com.google.cloud.bigtable.grpc.async.AsyncExecutor;
+import com.google.cloud.bigtable.grpc.async.HeapSizeManager;
+import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
+import com.google.common.base.MoreObjects;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+
+import io.grpc.Status.Code;
+import io.grpc.StatusRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * An implementation of {@link BigtableService} that actually communicates with the Cloud Bigtable
+ * service.
+ */
+class BigtableServiceImpl implements BigtableService {
+ private static final Logger logger = LoggerFactory.getLogger(BigtableService.class);
+
+ public BigtableServiceImpl(BigtableOptions options) {
+ this.options = options;
+ }
+
+ private final BigtableOptions options;
+
+ @Override
+ public BigtableWriterImpl openForWriting(String tableId) throws IOException {
+ BigtableSession session = new BigtableSession(options);
+ String tableName = options.getClusterName().toTableNameStr(tableId);
+ return new BigtableWriterImpl(session, tableName);
+ }
+
+ @Override
+ public boolean tableExists(String tableId) throws IOException {
+ if (!BigtableSession.isAlpnProviderEnabled()) {
+ logger.info(
+ "Skipping existence check for table {} (BigtableOptions {}) because ALPN is not"
+ + " configured.",
+ tableId,
+ options);
+ return true;
+ }
+
+ try (BigtableSession session = new BigtableSession(options)) {
+ GetTableRequest getTable =
+ GetTableRequest.newBuilder()
+ .setName(options.getClusterName().toTableNameStr(tableId))
+ .build();
+ session.getTableAdminClient().getTable(getTable);
+ return true;
+ } catch (StatusRuntimeException e) {
+ if (e.getStatus().getCode() == Code.NOT_FOUND) {
+ return false;
+ }
+ String message =
+ String.format(
+ "Error checking whether table %s (BigtableOptions %s) exists", tableId, options);
+ logger.error(message, e);
+ throw new IOException(message, e);
+ }
+ }
+
+ private class BigtableReaderImpl implements Reader {
+ private BigtableSession session;
+ private final BigtableSource source;
+ private ResultScanner<Row> results;
+ private Row currentRow;
+
+ public BigtableReaderImpl(BigtableSession session, BigtableSource source) {
+ this.session = session;
+ this.source = source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ RowRange range =
+ RowRange.newBuilder()
+ .setStartKey(source.getRange().getStartKey().getValue())
+ .setEndKey(source.getRange().getEndKey().getValue())
+ .build();
+ ReadRowsRequest.Builder requestB =
+ ReadRowsRequest.newBuilder()
+ .setRowRange(range)
+ .setTableName(options.getClusterName().toTableNameStr(source.getTableId()));
+ if (source.getRowFilter() != null) {
+ requestB.setFilter(source.getRowFilter());
+ }
+ results = session.getDataClient().readRows(requestB.build());
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ currentRow = results.next();
+ return (currentRow != null);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Goal: by the end of this function, both results and session are null and closed,
+ // independent of what errors they throw or prior state.
+
+ if (session == null) {
+ // Only possible when previously closed, so we know that results is also null.
+ return;
+ }
+
+ // Session does not implement Closeable -- it's AutoCloseable. So we can't register it with
+ // the Closer, but we can use the Closer to simplify the error handling.
+ try (Closer closer = Closer.create()) {
+ if (results != null) {
+ closer.register(results);
+ results = null;
+ }
+
+ session.close();
+ } finally {
+ session = null;
+ }
+ }
+
+ @Override
+ public Row getCurrentRow() throws NoSuchElementException {
+ if (currentRow == null) {
+ throw new NoSuchElementException();
+ }
+ return currentRow;
+ }
+ }
+
+ private static class BigtableWriterImpl implements Writer {
+ private BigtableSession session;
+ private AsyncExecutor executor;
+ private final MutateRowRequest.Builder partialBuilder;
+
+ public BigtableWriterImpl(BigtableSession session, String tableName) {
+ this.session = session;
+ this.executor =
+ new AsyncExecutor(
+ session.getDataClient(),
+ new HeapSizeManager(
+ AsyncExecutor.ASYNC_MUTATOR_MAX_MEMORY_DEFAULT,
+ AsyncExecutor.MAX_INFLIGHT_RPCS_DEFAULT));
+
+ partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (executor != null) {
+ executor.flush();
+ executor = null;
+ }
+ } finally {
+ if (session != null) {
+ session.close();
+ session = null;
+ }
+ }
+ }
+
+ @Override
+ public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
+ throws IOException {
+ MutateRowRequest r =
+ partialBuilder
+ .clone()
+ .setRowKey(record.getKey())
+ .addAllMutations(record.getValue())
+ .build();
+ try {
+ return executor.mutateRowAsync(r);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Write interrupted", e);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects
+ .toStringHelper(BigtableServiceImpl.class)
+ .add("options", options)
+ .toString();
+ }
+
+ @Override
+ public Reader createReader(BigtableSource source) throws IOException {
+ BigtableSession session = new BigtableSession(options);
+ return new BigtableReaderImpl(session, source);
+ }
+
+ @Override
+ public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException {
+ try (BigtableSession session = new BigtableSession(options)) {
+ SampleRowKeysRequest request =
+ SampleRowKeysRequest.newBuilder()
+ .setTableName(options.getClusterName().toTableNameStr(source.getTableId()))
+ .build();
+ return session.getDataClient().sampleRowKeys(request);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/package-info.java
new file mode 100644
index 0000000..c4c8c04
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines transforms for reading and writing from Google Cloud Bigtable.
+ *
+ * @see org.apache.beam.sdk.io.gcp.bigtable.BigtableIO
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
new file mode 100644
index 0000000..403ad9d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -0,0 +1,729 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import static org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource;
+import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
+import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
+import static org.apache.beam.sdk.testing.SourceTestUtils
+ .assertSplitAtFractionSucceedsAndConsistent;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verifyNotNull;
+
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import com.google.bigtable.v1.Cell;
+import com.google.bigtable.v1.Column;
+import com.google.bigtable.v1.Family;
+import com.google.bigtable.v1.Mutation;
+import com.google.bigtable.v1.Mutation.SetCell;
+import com.google.bigtable.v1.Row;
+import com.google.bigtable.v1.RowFilter;
+import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import javax.annotation.Nullable;
+
+/**
+ * Unit tests for {@link BigtableIO}.
+ */
+@RunWith(JUnit4.class)
+public class BigtableIOTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+ @Rule public ExpectedLogs logged = ExpectedLogs.none(BigtableIO.class);
+
+ /**
+ * These tests requires a static instance of the {@link FakeBigtableService} because the writers
+ * go through a serialization step when executing the test and would not affect passed-in objects
+ * otherwise.
+ */
+ private static FakeBigtableService service;
+ private static final BigtableOptions BIGTABLE_OPTIONS =
+ new BigtableOptions.Builder()
+ .setProjectId("project")
+ .setClusterId("cluster")
+ .setZoneId("zone")
+ .build();
+ private static BigtableIO.Read defaultRead =
+ BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS);
+ private static BigtableIO.Write defaultWrite =
+ BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS);
+ private Coder<KV<ByteString, Iterable<Mutation>>> bigtableCoder;
+ private static final TypeDescriptor<KV<ByteString, Iterable<Mutation>>> BIGTABLE_WRITE_TYPE =
+ new TypeDescriptor<KV<ByteString, Iterable<Mutation>>>() {};
+
+ @Before
+ public void setup() throws Exception {
+ service = new FakeBigtableService();
+ defaultRead = defaultRead.withBigtableService(service);
+ defaultWrite = defaultWrite.withBigtableService(service);
+ bigtableCoder = TestPipeline.create().getCoderRegistry().getCoder(BIGTABLE_WRITE_TYPE);
+ }
+
+ @Test
+ public void testReadBuildsCorrectly() {
+ BigtableIO.Read read =
+ BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS).withTableId("table");
+ assertEquals("project", read.getBigtableOptions().getProjectId());
+ assertEquals("cluster", read.getBigtableOptions().getClusterId());
+ assertEquals("zone", read.getBigtableOptions().getZoneId());
+ assertEquals("table", read.getTableId());
+ }
+
+ @Test
+ public void testReadBuildsCorrectlyInDifferentOrder() {
+ BigtableIO.Read read =
+ BigtableIO.read().withTableId("table").withBigtableOptions(BIGTABLE_OPTIONS);
+ assertEquals("project", read.getBigtableOptions().getProjectId());
+ assertEquals("cluster", read.getBigtableOptions().getClusterId());
+ assertEquals("zone", read.getBigtableOptions().getZoneId());
+ assertEquals("table", read.getTableId());
+ }
+
+ @Test
+ public void testWriteBuildsCorrectly() {
+ BigtableIO.Write write =
+ BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS).withTableId("table");
+ assertEquals("table", write.getTableId());
+ assertEquals("project", write.getBigtableOptions().getProjectId());
+ assertEquals("zone", write.getBigtableOptions().getZoneId());
+ assertEquals("cluster", write.getBigtableOptions().getClusterId());
+ }
+
+ @Test
+ public void testWriteBuildsCorrectlyInDifferentOrder() {
+ BigtableIO.Write write =
+ BigtableIO.write().withTableId("table").withBigtableOptions(BIGTABLE_OPTIONS);
+ assertEquals("cluster", write.getBigtableOptions().getClusterId());
+ assertEquals("project", write.getBigtableOptions().getProjectId());
+ assertEquals("zone", write.getBigtableOptions().getZoneId());
+ assertEquals("table", write.getTableId());
+ }
+
+ @Test
+ public void testWriteValidationFailsMissingTable() {
+ BigtableIO.Write write = BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS);
+
+ thrown.expect(IllegalArgumentException.class);
+
+ write.validate(null);
+ }
+
+ @Test
+ public void testWriteValidationFailsMissingOptions() {
+ BigtableIO.Write write = BigtableIO.write().withTableId("table");
+
+ thrown.expect(IllegalArgumentException.class);
+
+ write.validate(null);
+ }
+
+ /** Helper function to make a single row mutation to be written. */
+ private static KV<ByteString, Iterable<Mutation>> makeWrite(String key, String value) {
+ ByteString rowKey = ByteString.copyFromUtf8(key);
+ Iterable<Mutation> mutations =
+ ImmutableList.of(
+ Mutation.newBuilder()
+ .setSetCell(SetCell.newBuilder().setValue(ByteString.copyFromUtf8(value)))
+ .build());
+ return KV.of(rowKey, mutations);
+ }
+
+ /** Helper function to make a single bad row mutation (no set cell). */
+ private static KV<ByteString, Iterable<Mutation>> makeBadWrite(String key) {
+ Iterable<Mutation> mutations = ImmutableList.of(Mutation.newBuilder().build());
+ return KV.of(ByteString.copyFromUtf8(key), mutations);
+ }
+
+ /** Tests that when reading from a non-existent table, the read fails. */
+ @Test
+ public void testReadingFailsTableDoesNotExist() throws Exception {
+ final String table = "TEST-TABLE";
+
+ BigtableIO.Read read =
+ BigtableIO.read()
+ .withBigtableOptions(BIGTABLE_OPTIONS)
+ .withTableId(table)
+ .withBigtableService(service);
+
+ // Exception will be thrown by read.validate() when read is applied.
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(String.format("Table %s does not exist", table));
+
+ TestPipeline.create().apply(read);
+ }
+
+ /** Tests that when reading from an empty table, the read succeeds. */
+ @Test
+ public void testReadingEmptyTable() throws Exception {
+ final String table = "TEST-EMPTY-TABLE";
+ service.createTable(table);
+
+ TestPipeline p = TestPipeline.create();
+ PCollection<Row> rows = p.apply(defaultRead.withTableId(table));
+ PAssert.that(rows).empty();
+
+ p.run();
+ logged.verifyInfo(String.format("Closing reader after reading 0 records."));
+ }
+
+ /** Tests reading all rows from a table. */
+ @Test
+ public void testReading() throws Exception {
+ final String table = "TEST-MANY-ROWS-TABLE";
+ final int numRows = 1001;
+ List<Row> testRows = makeTableData(table, numRows);
+
+ TestPipeline p = TestPipeline.create();
+ PCollection<Row> rows = p.apply(defaultRead.withTableId(table));
+ PAssert.that(rows).containsInAnyOrder(testRows);
+
+ p.run();
+ logged.verifyInfo(String.format("Closing reader after reading %d records.", numRows));
+ }
+
+ /** A {@link Predicate} that a {@link Row Row's} key matches the given regex. */
+ private static class KeyMatchesRegex implements Predicate<ByteString> {
+ private final String regex;
+
+ public KeyMatchesRegex(String regex) {
+ this.regex = regex;
+ }
+
+ @Override
+ public boolean apply(@Nullable ByteString input) {
+ verifyNotNull(input, "input");
+ return input.toStringUtf8().matches(regex);
+ }
+ }
+
+ /** Tests reading all rows using a filter. */
+ @Test
+ public void testReadingWithFilter() throws Exception {
+ final String table = "TEST-FILTER-TABLE";
+ final int numRows = 1001;
+ List<Row> testRows = makeTableData(table, numRows);
+ String regex = ".*17.*";
+ final KeyMatchesRegex keyPredicate = new KeyMatchesRegex(regex);
+ Iterable<Row> filteredRows =
+ Iterables.filter(
+ testRows,
+ new Predicate<Row>() {
+ @Override
+ public boolean apply(@Nullable Row input) {
+ verifyNotNull(input, "input");
+ return keyPredicate.apply(input.getKey());
+ }
+ });
+
+ RowFilter filter =
+ RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(regex)).build();
+
+ TestPipeline p = TestPipeline.create();
+ PCollection<Row> rows = p.apply(defaultRead.withTableId(table).withRowFilter(filter));
+ PAssert.that(rows).containsInAnyOrder(filteredRows);
+
+ p.run();
+ }
+
+ /**
+ * Tests dynamic work rebalancing exhaustively.
+ *
+ * <p>Because this test runs so slowly, it is disabled by default. Re-run when changing the
+ * {@link BigtableIO.Read} implementation.
+ */
+ @Ignore("Slow. Rerun when changing the implementation.")
+ @Test
+ public void testReadingSplitAtFractionExhaustive() throws Exception {
+ final String table = "TEST-FEW-ROWS-SPLIT-EXHAUSTIVE-TABLE";
+ final int numRows = 10;
+ final int numSamples = 1;
+ final long bytesPerRow = 1L;
+ makeTableData(table, numRows);
+ service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+ BigtableSource source =
+ new BigtableSource(service, table, null, service.getTableRange(table), null);
+ assertSplitAtFractionExhaustive(source, null);
+ }
+
+ /**
+ * Unit tests of splitAtFraction.
+ */
+ @Test
+ public void testReadingSplitAtFraction() throws Exception {
+ final String table = "TEST-SPLIT-AT-FRACTION";
+ final int numRows = 10;
+ final int numSamples = 1;
+ final long bytesPerRow = 1L;
+ makeTableData(table, numRows);
+ service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+ BigtableSource source =
+ new BigtableSource(service, table, null, service.getTableRange(table), null);
+ // With 0 items read, all split requests will fail.
+ assertSplitAtFractionFails(source, 0, 0.1, null /* options */);
+ assertSplitAtFractionFails(source, 0, 1.0, null /* options */);
+ // With 1 items read, all split requests past 1/10th will succeed.
+ assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.333, null /* options */);
+ assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.666, null /* options */);
+ // With 3 items read, all split requests past 3/10ths will succeed.
+ assertSplitAtFractionFails(source, 3, 0.2, null /* options */);
+ assertSplitAtFractionSucceedsAndConsistent(source, 3, 0.571, null /* options */);
+ assertSplitAtFractionSucceedsAndConsistent(source, 3, 0.9, null /* options */);
+ // With 6 items read, all split requests past 6/10ths will succeed.
+ assertSplitAtFractionFails(source, 6, 0.5, null /* options */);
+ assertSplitAtFractionSucceedsAndConsistent(source, 6, 0.7, null /* options */);
+ }
+
+ /** Tests reading all rows from a split table. */
+ @Test
+ public void testReadingWithSplits() throws Exception {
+ final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
+ final int numRows = 1500;
+ final int numSamples = 10;
+ final long bytesPerRow = 100L;
+
+ // Set up test table data and sample row keys for size estimation and splitting.
+ makeTableData(table, numRows);
+ service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+ // Generate source and split it.
+ BigtableSource source =
+ new BigtableSource(service, table, null /*filter*/, ByteKeyRange.ALL_KEYS, null /*size*/);
+ List<BigtableSource> splits =
+ source.splitIntoBundles(numRows * bytesPerRow / numSamples, null /* options */);
+
+ // Test num splits and split equality.
+ assertThat(splits, hasSize(numSamples));
+ assertSourcesEqualReferenceSource(source, splits, null /* options */);
+ }
+
+ /** Tests reading all rows from a sub-split table. */
+ @Test
+ public void testReadingWithSubSplits() throws Exception {
+ final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
+ final int numRows = 1000;
+ final int numSamples = 10;
+ final int numSplits = 20;
+ final long bytesPerRow = 100L;
+
+ // Set up test table data and sample row keys for size estimation and splitting.
+ makeTableData(table, numRows);
+ service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+ // Generate source and split it.
+ BigtableSource source =
+ new BigtableSource(service, table, null /*filter*/, ByteKeyRange.ALL_KEYS, null /*size*/);
+ List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
+
+ // Test num splits and split equality.
+ assertThat(splits, hasSize(numSplits));
+ assertSourcesEqualReferenceSource(source, splits, null /* options */);
+ }
+
+ /** Tests reading all rows from a sub-split table. */
+ @Test
+ public void testReadingWithFilterAndSubSplits() throws Exception {
+ final String table = "TEST-FILTER-SUB-SPLITS";
+ final int numRows = 1700;
+ final int numSamples = 10;
+ final int numSplits = 20;
+ final long bytesPerRow = 100L;
+
+ // Set up test table data and sample row keys for size estimation and splitting.
+ makeTableData(table, numRows);
+ service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+ // Generate source and split it.
+ RowFilter filter =
+ RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build();
+ BigtableSource source =
+ new BigtableSource(service, table, filter, ByteKeyRange.ALL_KEYS, null /*size*/);
+ List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
+
+ // Test num splits and split equality.
+ assertThat(splits, hasSize(numSplits));
+ assertSourcesEqualReferenceSource(source, splits, null /* options */);
+ }
+
+ @Test
+ public void testReadingDisplayData() {
+ RowFilter rowFilter = RowFilter.newBuilder()
+ .setRowKeyRegexFilter(ByteString.copyFromUtf8("foo.*"))
+ .build();
+
+ BigtableIO.Read read = BigtableIO.read()
+ .withBigtableOptions(BIGTABLE_OPTIONS)
+ .withTableId("fooTable")
+ .withRowFilter(rowFilter);
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
+ assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString()));
+
+ // BigtableIO adds user-agent to options; assert only on key and not value.
+ assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions")));
+ }
+
+ /** Tests that a record gets written to the service and messages are logged. */
+ @Test
+ public void testWriting() throws Exception {
+ final String table = "table";
+ final String key = "key";
+ final String value = "value";
+
+ service.createTable(table);
+
+ TestPipeline p = TestPipeline.create();
+ p.apply("single row", Create.of(makeWrite(key, value)).withCoder(bigtableCoder))
+ .apply("write", defaultWrite.withTableId(table));
+ p.run();
+
+ logged.verifyInfo("Wrote 1 records");
+
+ assertEquals(1, service.tables.size());
+ assertNotNull(service.getTable(table));
+ Map<ByteString, ByteString> rows = service.getTable(table);
+ assertEquals(1, rows.size());
+ assertEquals(ByteString.copyFromUtf8(value), rows.get(ByteString.copyFromUtf8(key)));
+ }
+
+ /** Tests that when writing to a non-existent table, the write fails. */
+ @Test
+ public void testWritingFailsTableDoesNotExist() throws Exception {
+ final String table = "TEST-TABLE";
+
+ PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput =
+ TestPipeline.create().apply(Create.<KV<ByteString, Iterable<Mutation>>>of());
+
+ // Exception will be thrown by write.validate() when write is applied.
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(String.format("Table %s does not exist", table));
+
+ emptyInput.apply("write", defaultWrite.withTableId(table));
+ }
+
+ /** Tests that when writing an element fails, the write fails. */
+ @Test
+ public void testWritingFailsBadElement() throws Exception {
+ final String table = "TEST-TABLE";
+ final String key = "KEY";
+ service.createTable(table);
+
+ TestPipeline p = TestPipeline.create();
+ p.apply(Create.of(makeBadWrite(key)).withCoder(bigtableCoder))
+ .apply(defaultWrite.withTableId(table));
+
+ thrown.expect(PipelineExecutionException.class);
+ thrown.expectCause(Matchers.<Throwable>instanceOf(IOException.class));
+ thrown.expectMessage("At least 1 errors occurred writing to Bigtable. First 1 errors:");
+ thrown.expectMessage("Error mutating row " + key + " with mutations []: cell value missing");
+ p.run();
+ }
+
+ @Test
+ public void testWritingDisplayData() {
+ BigtableIO.Write write = BigtableIO.write()
+ .withTableId("fooTable")
+ .withBigtableOptions(BIGTABLE_OPTIONS);
+
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
+ // BigtableIO adds user-agent to options; assert only on key and not value.
+ assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions")));
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ private static final String COLUMN_FAMILY_NAME = "family";
+ private static final ByteString COLUMN_NAME = ByteString.copyFromUtf8("column");
+ private static final Column TEST_COLUMN = Column.newBuilder().setQualifier(COLUMN_NAME).build();
+ private static final Family TEST_FAMILY = Family.newBuilder().setName(COLUMN_FAMILY_NAME).build();
+
+ /** Helper function that builds a {@link Row} in a test table that could be returned by read. */
+ private static Row makeRow(ByteString key, ByteString value) {
+ // Build the currentRow and return true.
+ Column.Builder newColumn = TEST_COLUMN.toBuilder().addCells(Cell.newBuilder().setValue(value));
+ return Row.newBuilder()
+ .setKey(key)
+ .addFamilies(TEST_FAMILY.toBuilder().addColumns(newColumn))
+ .build();
+ }
+
+ /** Helper function to create a table and return the rows that it created. */
+ private static List<Row> makeTableData(String tableId, int numRows) {
+ service.createTable(tableId);
+ Map<ByteString, ByteString> testData = service.getTable(tableId);
+
+ List<Row> testRows = new ArrayList<>(numRows);
+ for (int i = 0; i < numRows; ++i) {
+ ByteString key = ByteString.copyFromUtf8(String.format("key%09d", i));
+ ByteString value = ByteString.copyFromUtf8(String.format("value%09d", i));
+ testData.put(key, value);
+ testRows.add(makeRow(key, value));
+ }
+
+ return testRows;
+ }
+
+
+ /**
+ * A {@link BigtableService} implementation that stores tables and their contents in memory.
+ */
+ private static class FakeBigtableService implements BigtableService {
+ private final Map<String, SortedMap<ByteString, ByteString>> tables = new HashMap<>();
+ private final Map<String, List<SampleRowKeysResponse>> sampleRowKeys = new HashMap<>();
+
+ @Nullable
+ public SortedMap<ByteString, ByteString> getTable(String tableId) {
+ return tables.get(tableId);
+ }
+
+ public ByteKeyRange getTableRange(String tableId) {
+ verifyTableExists(tableId);
+ SortedMap<ByteString, ByteString> data = tables.get(tableId);
+ return ByteKeyRange.of(ByteKey.of(data.firstKey()), ByteKey.of(data.lastKey()));
+ }
+
+ public void createTable(String tableId) {
+ tables.put(tableId, new TreeMap<ByteString, ByteString>(new ByteStringComparator()));
+ }
+
+ @Override
+ public boolean tableExists(String tableId) {
+ return tables.containsKey(tableId);
+ }
+
+ public void verifyTableExists(String tableId) {
+ checkArgument(tableExists(tableId), "Table %s does not exist", tableId);
+ }
+
+ @Override
+ public FakeBigtableReader createReader(BigtableSource source) {
+ return new FakeBigtableReader(source);
+ }
+
+ @Override
+ public FakeBigtableWriter openForWriting(String tableId) {
+ return new FakeBigtableWriter(tableId);
+ }
+
+ @Override
+ public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) {
+ List<SampleRowKeysResponse> samples = sampleRowKeys.get(source.getTableId());
+ checkArgument(samples != null, "No samples found for table %s", source.getTableId());
+ return samples;
+ }
+
+ /** Sets up the sample row keys for the specified table. */
+ void setupSampleRowKeys(String tableId, int numSamples, long bytesPerRow) {
+ verifyTableExists(tableId);
+ checkArgument(numSamples > 0, "Number of samples must be positive: %s", numSamples);
+ checkArgument(bytesPerRow > 0, "Bytes/Row must be positive: %s", bytesPerRow);
+
+ ImmutableList.Builder<SampleRowKeysResponse> ret = ImmutableList.builder();
+ SortedMap<ByteString, ByteString> rows = getTable(tableId);
+ int currentSample = 1;
+ int rowsSoFar = 0;
+ for (Map.Entry<ByteString, ByteString> entry : rows.entrySet()) {
+ if (((double) rowsSoFar) / rows.size() >= ((double) currentSample) / numSamples) {
+ // add the sample with the total number of bytes in the table before this key.
+ ret.add(
+ SampleRowKeysResponse.newBuilder()
+ .setRowKey(entry.getKey())
+ .setOffsetBytes(rowsSoFar * bytesPerRow)
+ .build());
+ // Move on to next sample
+ currentSample++;
+ }
+ ++rowsSoFar;
+ }
+
+ // Add the last sample indicating the end of the table, with all rows before it.
+ ret.add(SampleRowKeysResponse.newBuilder().setOffsetBytes(rows.size() * bytesPerRow).build());
+ sampleRowKeys.put(tableId, ret.build());
+ }
+ }
+
+ /**
+ * A {@link BigtableService.Reader} implementation that reads from the static instance of
+ * {@link FakeBigtableService} stored in {@link #service}.
+ *
+ * <p>This reader does not support {@link RowFilter} objects.
+ */
+ private static class FakeBigtableReader implements BigtableService.Reader {
+ private final BigtableSource source;
+ private Iterator<Map.Entry<ByteString, ByteString>> rows;
+ private Row currentRow;
+ private Predicate<ByteString> filter;
+
+ public FakeBigtableReader(BigtableSource source) {
+ this.source = source;
+ if (source.getRowFilter() == null) {
+ filter = Predicates.alwaysTrue();
+ } else {
+ ByteString keyRegex = source.getRowFilter().getRowKeyRegexFilter();
+ checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported");
+ filter = new KeyMatchesRegex(keyRegex.toStringUtf8());
+ }
+ service.verifyTableExists(source.getTableId());
+ }
+
+ @Override
+ public boolean start() {
+ rows = service.tables.get(source.getTableId()).entrySet().iterator();
+ return advance();
+ }
+
+ @Override
+ public boolean advance() {
+ // Loop until we find a row in range, or reach the end of the iterator.
+ Map.Entry<ByteString, ByteString> entry = null;
+ while (rows.hasNext()) {
+ entry = rows.next();
+ if (!filter.apply(entry.getKey())
+ || !source.getRange().containsKey(ByteKey.of(entry.getKey()))) {
+ // Does not match row filter or does not match source range. Skip.
+ entry = null;
+ continue;
+ }
+ // Found a row inside this source's key range, stop.
+ break;
+ }
+
+ // Return false if no more rows.
+ if (entry == null) {
+ currentRow = null;
+ return false;
+ }
+
+ // Set the current row and return true.
+ currentRow = makeRow(entry.getKey(), entry.getValue());
+ return true;
+ }
+
+ @Override
+ public Row getCurrentRow() {
+ if (currentRow == null) {
+ throw new NoSuchElementException();
+ }
+ return currentRow;
+ }
+
+ @Override
+ public void close() {
+ rows = null;
+ currentRow = null;
+ }
+ }
+
+ /**
+ * A {@link BigtableService.Writer} implementation that writes to the static instance of
+ * {@link FakeBigtableService} stored in {@link #service}.
+ *
+ * <p>This writer only supports {@link Mutation Mutations} that consist only of {@link SetCell}
+ * entries. The column family in the {@link SetCell} is ignored; only the value is used.
+ *
+ * <p>When no {@link SetCell} is provided, the write will fail and this will be exposed via an
+ * exception on the returned {@link ListenableFuture}.
+ */
+ private static class FakeBigtableWriter implements BigtableService.Writer {
+ private final String tableId;
+
+ public FakeBigtableWriter(String tableId) {
+ this.tableId = tableId;
+ }
+
+ @Override
+ public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record) {
+ service.verifyTableExists(tableId);
+ Map<ByteString, ByteString> table = service.getTable(tableId);
+ ByteString key = record.getKey();
+ for (Mutation m : record.getValue()) {
+ SetCell cell = m.getSetCell();
+ if (cell.getValue().isEmpty()) {
+ return Futures.immediateFailedCheckedFuture(new IOException("cell value missing"));
+ }
+ table.put(key, cell.getValue());
+ }
+ return Futures.immediateFuture(Empty.getDefaultInstance());
+ }
+
+ @Override
+ public void close() {}
+ }
+
+ /** A serializable comparator for ByteString. Used to make row samples. */
+ private static final class ByteStringComparator implements Comparator<ByteString>, Serializable {
+ @Override
+ public int compare(ByteString o1, ByteString o2) {
+ return ByteKey.of(o1).compareTo(ByteKey.of(o2));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 95d1f55..0027e0e 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -35,6 +35,7 @@
(sources and sinks) to consume and produce data from systems.</description>
<modules>
+ <module>google-cloud-platform</module>
<module>hdfs</module>
<module>kafka</module>
</modules>
[2/3] incubator-beam git commit: [BEAM-77] Move bigtable in IO
Posted by dh...@apache.org.
[BEAM-77] Move bigtable in IO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d138ae54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d138ae54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d138ae54
Branch: refs/heads/master
Commit: d138ae5445315ced3e0a198ab1e99773a5ebfee0
Parents: 9746f0d
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed Apr 20 07:40:49 2016 +0200
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 26 14:08:54 2016 -0700
----------------------------------------------------------------------
pom.xml | 1 -
sdks/java/core/pom.xml | 25 -
.../apache/beam/sdk/io/bigtable/BigtableIO.java | 1016 ------------------
.../beam/sdk/io/bigtable/BigtableService.java | 111 --
.../sdk/io/bigtable/BigtableServiceImpl.java | 244 -----
.../beam/sdk/io/bigtable/package-info.java | 23 -
.../beam/sdk/io/bigtable/BigtableIOTest.java | 729 -------------
sdks/java/io/google-cloud-platform/pom.xml | 96 ++
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 1016 ++++++++++++++++++
.../sdk/io/gcp/bigtable/BigtableService.java | 111 ++
.../io/gcp/bigtable/BigtableServiceImpl.java | 244 +++++
.../beam/sdk/io/gcp/bigtable/package-info.java | 23 +
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 729 +++++++++++++
sdks/java/io/pom.xml | 1 +
14 files changed, 2220 insertions(+), 2149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 03f0fb2..1bd18ec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,7 +102,6 @@
<!-- If updating dependencies, please update any relevant javadoc offlineLinks -->
<avro.version>1.7.7</avro.version>
<bigquery.version>v2-rev248-1.21.0</bigquery.version>
- <bigtable.version>0.2.3</bigtable.version>
<pubsubgrpc.version>0.0.2</pubsubgrpc.version>
<clouddebugger.version>v2-rev6-1.21.0</clouddebugger.version>
<dataflow.version>v1b3-rev22-1.21.0</dataflow.version>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index c634e9c..07d2fce 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -227,7 +227,6 @@
<shadeTestJar>true</shadeTestJar>
<artifactSet>
<includes>
- <include>com.google.cloud.bigtable:bigtable-client-core</include>
<include>com.google.guava:guava</include>
</includes>
</artifactSet>
@@ -253,17 +252,6 @@
<pattern>com.google.thirdparty</pattern>
<shadedPattern>org.apache.beam.sdk.repackaged.com.google.thirdparty</shadedPattern>
</relocation>
- <relocation>
- <pattern>com.google.cloud.bigtable</pattern>
- <shadedPattern>org.apache.beam.sdk.repackaged.com.google.cloud.bigtable</shadedPattern>
- <excludes>
- <exclude>com.google.cloud.bigtable.config.BigtableOptions*</exclude>
- <exclude>com.google.cloud.bigtable.config.CredentialOptions*</exclude>
- <exclude>com.google.cloud.bigtable.config.RetryOptions*</exclude>
- <exclude>com.google.cloud.bigtable.grpc.BigtableClusterName</exclude>
- <exclude>com.google.cloud.bigtable.grpc.BigtableTableName</exclude>
- </excludes>
- </relocation>
</relocations>
</configuration>
</execution>
@@ -281,7 +269,6 @@
<finalName>${project.artifactId}-bundled-${project.version}</finalName>
<artifactSet>
<excludes>
- <exclude>com.google.cloud.bigtable:bigtable-client-core</exclude>
<exclude>com.google.guava:guava</exclude>
</excludes>
</artifactSet>
@@ -394,18 +381,6 @@
</dependency>
<dependency>
- <groupId>com.google.cloud.bigtable</groupId>
- <artifactId>bigtable-protos</artifactId>
- <version>${bigtable.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.cloud.bigtable</groupId>
- <artifactId>bigtable-client-core</artifactId>
- <version>${bigtable.version}</version>
- </dependency>
-
- <dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
<version>${google-clients.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
deleted file mode 100644
index 28ff335..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
+++ /dev/null
@@ -1,1016 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.bigtable;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Sink.WriteOperation;
-import org.apache.beam.sdk.io.Sink.Writer;
-import org.apache.beam.sdk.io.range.ByteKey;
-import org.apache.beam.sdk.io.range.ByteKeyRange;
-import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.ReleaseInfo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowFilter;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import javax.annotation.Nullable;
-
-/**
- * A bounded source and sink for Google Cloud Bigtable.
- *
- * <p>For more information, see the online documentation at
- * <a href="https://cloud.google.com/bigtable/">Google Cloud Bigtable</a>.
- *
- * <h3>Reading from Cloud Bigtable</h3>
- *
- * <p>The Bigtable source returns a set of rows from a single table, returning a
- * {@code PCollection<Row>}.
- *
- * <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions}
- * or builder configured with the project and other information necessary to identify the
- * Bigtable cluster. A {@link RowFilter} may also optionally be specified using
- * {@link BigtableIO.Read#withRowFilter}. For example:
- *
- * <pre>{@code
- * BigtableOptions.Builder optionsBuilder =
- * new BigtableOptions.Builder()
- * .setProjectId("project")
- * .setClusterId("cluster")
- * .setZoneId("zone");
- *
- * Pipeline p = ...;
- *
- * // Scan the entire table.
- * p.apply("read",
- * BigtableIO.read()
- * .withBigtableOptions(optionsBuilder)
- * .withTableId("table"));
- *
- * // Scan a subset of rows that match the specified row filter.
- * p.apply("filtered read",
- * BigtableIO.read()
- * .withBigtableOptions(optionsBuilder)
- * .withTableId("table")
- * .withRowFilter(filter));
- * }</pre>
- *
- * <h3>Writing to Cloud Bigtable</h3>
- *
- * <p>The Bigtable sink executes a set of row mutations on a single table. It takes as input a
- * {@link PCollection PCollection<KV<ByteString, Iterable<Mutation>>>}, where the
- * {@link ByteString} is the key of the row being mutated, and each {@link Mutation} represents an
- * idempotent transformation to that row.
- *
- * <p>To configure a Cloud Bigtable sink, you must supply a table id and a {@link BigtableOptions}
- * or builder configured with the project and other information necessary to identify the
- * Bigtable cluster, for example:
- *
- * <pre>{@code
- * BigtableOptions.Builder optionsBuilder =
- * new BigtableOptions.Builder()
- * .setProjectId("project")
- * .setClusterId("cluster")
- * .setZoneId("zone");
- *
- * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
- *
- * data.apply("write",
- * BigtableIO.write()
- * .withBigtableOptions(optionsBuilder)
- * .withTableId("table"));
- * }</pre>
- *
- * <h3>Experimental</h3>
- *
- * <p>This connector for Cloud Bigtable is considered experimental and may break or receive
- * backwards-incompatible changes in future versions of the Cloud Dataflow SDK. Cloud Bigtable is
- * in Beta, and thus it may introduce breaking changes in future revisions of its service or APIs.
- *
- * <h3>Permissions</h3>
- *
- * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
- * Dataflow job. Please refer to the documentation of corresponding
- * {@link PipelineRunner PipelineRunners} for more details.
- */
-@Experimental
-public class BigtableIO {
- private static final Logger logger = LoggerFactory.getLogger(BigtableIO.class);
-
- /**
- * Creates an uninitialized {@link BigtableIO.Read}. Before use, the {@code Read} must be
- * initialized with a
- * {@link BigtableIO.Read#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
- * the source Cloud Bigtable cluster, and a {@link BigtableIO.Read#withTableId tableId} that
- * specifies which table to read. A {@link RowFilter} may also optionally be specified using
- * {@link BigtableIO.Read#withRowFilter}.
- */
- @Experimental
- public static Read read() {
- return new Read(null, "", null, null);
- }
-
- /**
- * Creates an uninitialized {@link BigtableIO.Write}. Before use, the {@code Write} must be
- * initialized with a
- * {@link BigtableIO.Write#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
- * the destination Cloud Bigtable cluster, and a {@link BigtableIO.Write#withTableId tableId} that
- * specifies which table to write.
- */
- @Experimental
- public static Write write() {
- return new Write(null, "", null);
- }
-
- /**
- * A {@link PTransform} that reads from Google Cloud Bigtable. See the class-level Javadoc on
- * {@link BigtableIO} for more information.
- *
- * @see BigtableIO
- */
- @Experimental
- public static class Read extends PTransform<PBegin, PCollection<Row>> {
- /**
- * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
- * indicated by the given options, and using any other specified customizations.
- *
- * <p>Does not modify this object.
- */
- public Read withBigtableOptions(BigtableOptions options) {
- checkNotNull(options, "options");
- return withBigtableOptions(options.toBuilder());
- }
-
- /**
- * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
- * indicated by the given options, and using any other specified customizations.
- *
- * <p>Clones the given {@link BigtableOptions} builder so that any further changes
- * will have no effect on the returned {@link BigtableIO.Read}.
- *
- * <p>Does not modify this object.
- */
- public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
- checkNotNull(optionsBuilder, "optionsBuilder");
- // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
- BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
- BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
- return new Read(optionsWithAgent, tableId, filter, bigtableService);
- }
-
- /**
- * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
- * using the given row filter.
- *
- * <p>Does not modify this object.
- */
- public Read withRowFilter(RowFilter filter) {
- checkNotNull(filter, "filter");
- return new Read(options, tableId, filter, bigtableService);
- }
-
- /**
- * Returns a new {@link BigtableIO.Read} that will read from the specified table.
- *
- * <p>Does not modify this object.
- */
- public Read withTableId(String tableId) {
- checkNotNull(tableId, "tableId");
- return new Read(options, tableId, filter, bigtableService);
- }
-
- /**
- * Returns the Google Cloud Bigtable cluster being read from, and other parameters.
- */
- public BigtableOptions getBigtableOptions() {
- return options;
- }
-
- /**
- * Returns the table being read from.
- */
- public String getTableId() {
- return tableId;
- }
-
- @Override
- public PCollection<Row> apply(PBegin input) {
- BigtableSource source =
- new BigtableSource(getBigtableService(), tableId, filter, ByteKeyRange.ALL_KEYS, null);
- return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
- }
-
- @Override
- public void validate(PBegin input) {
- checkArgument(options != null, "BigtableOptions not specified");
- checkArgument(!tableId.isEmpty(), "Table ID not specified");
- try {
- checkArgument(
- getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
- } catch (IOException e) {
- logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
- }
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
-
- builder.add("tableId", tableId);
-
- if (options != null) {
- builder.add("bigtableOptions", options.toString());
- }
-
- if (filter != null) {
- builder.add("rowFilter", filter.toString());
- }
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(Read.class)
- .add("options", options)
- .add("tableId", tableId)
- .add("filter", filter)
- .toString();
- }
-
- /////////////////////////////////////////////////////////////////////////////////////////
- /**
- * Used to define the Cloud Bigtable cluster and any options for the networking layer.
- * Cannot actually be {@code null} at validation time, but may start out {@code null} while
- * source is being built.
- */
- @Nullable private final BigtableOptions options;
- private final String tableId;
- @Nullable private final RowFilter filter;
- @Nullable private final BigtableService bigtableService;
-
- private Read(
- @Nullable BigtableOptions options,
- String tableId,
- @Nullable RowFilter filter,
- @Nullable BigtableService bigtableService) {
- this.options = options;
- this.tableId = checkNotNull(tableId, "tableId");
- this.filter = filter;
- this.bigtableService = bigtableService;
- }
-
- /**
- * Returns a new {@link BigtableIO.Read} that will read using the given Cloud Bigtable
- * service implementation.
- *
- * <p>This is used for testing.
- *
- * <p>Does not modify this object.
- */
- Read withBigtableService(BigtableService bigtableService) {
- checkNotNull(bigtableService, "bigtableService");
- return new Read(options, tableId, filter, bigtableService);
- }
-
- /**
- * Helper function that either returns the mock Bigtable service supplied by
- * {@link #withBigtableService} or creates and returns an implementation that talks to
- * {@code Cloud Bigtable}.
- */
- private BigtableService getBigtableService() {
- if (bigtableService != null) {
- return bigtableService;
- }
- return new BigtableServiceImpl(options);
- }
- }
-
- /**
- * A {@link PTransform} that writes to Google Cloud Bigtable. See the class-level Javadoc on
- * {@link BigtableIO} for more information.
- *
- * @see BigtableIO
- */
- @Experimental
- public static class Write
- extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {
- /**
- * Used to define the Cloud Bigtable cluster and any options for the networking layer.
- * Cannot actually be {@code null} at validation time, but may start out {@code null} while
- * source is being built.
- */
- @Nullable private final BigtableOptions options;
- private final String tableId;
- @Nullable private final BigtableService bigtableService;
-
- private Write(
- @Nullable BigtableOptions options,
- String tableId,
- @Nullable BigtableService bigtableService) {
- this.options = options;
- this.tableId = checkNotNull(tableId, "tableId");
- this.bigtableService = bigtableService;
- }
-
- /**
- * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
- * indicated by the given options, and using any other specified customizations.
- *
- * <p>Does not modify this object.
- */
- public Write withBigtableOptions(BigtableOptions options) {
- checkNotNull(options, "options");
- return withBigtableOptions(options.toBuilder());
- }
-
- /**
- * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
- * indicated by the given options, and using any other specified customizations.
- *
- * <p>Clones the given {@link BigtableOptions} builder so that any further changes
- * will have no effect on the returned {@link BigtableIO.Write}.
- *
- * <p>Does not modify this object.
- */
- public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
- checkNotNull(optionsBuilder, "optionsBuilder");
- // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
- BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
- BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
- return new Write(optionsWithAgent, tableId, bigtableService);
- }
-
- /**
- * Returns a new {@link BigtableIO.Write} that will write to the specified table.
- *
- * <p>Does not modify this object.
- */
- public Write withTableId(String tableId) {
- checkNotNull(tableId, "tableId");
- return new Write(options, tableId, bigtableService);
- }
-
- /**
- * Returns the Google Cloud Bigtable cluster being written to, and other parameters.
- */
- public BigtableOptions getBigtableOptions() {
- return options;
- }
-
- /**
- * Returns the table being written to.
- */
- public String getTableId() {
- return tableId;
- }
-
- @Override
- public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
- Sink sink = new Sink(tableId, getBigtableService());
- return input.apply(org.apache.beam.sdk.io.Write.to(sink));
- }
-
- @Override
- public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
- checkArgument(options != null, "BigtableOptions not specified");
- checkArgument(!tableId.isEmpty(), "Table ID not specified");
- try {
- checkArgument(
- getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
- } catch (IOException e) {
- logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
- }
- }
-
- /**
- * Returns a new {@link BigtableIO.Write} that will write using the given Cloud Bigtable
- * service implementation.
- *
- * <p>This is used for testing.
- *
- * <p>Does not modify this object.
- */
- Write withBigtableService(BigtableService bigtableService) {
- checkNotNull(bigtableService, "bigtableService");
- return new Write(options, tableId, bigtableService);
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
-
- builder.add("tableId", tableId);
-
- if (options != null) {
- builder.add("bigtableOptions", options.toString());
- }
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(Write.class)
- .add("options", options)
- .add("tableId", tableId)
- .toString();
- }
-
- /**
- * Helper function that either returns the mock Bigtable service supplied by
- * {@link #withBigtableService} or creates and returns an implementation that talks to
- * {@code Cloud Bigtable}.
- */
- private BigtableService getBigtableService() {
- if (bigtableService != null) {
- return bigtableService;
- }
- return new BigtableServiceImpl(options);
- }
- }
-
- //////////////////////////////////////////////////////////////////////////////////////////
- /** Disallow construction of utility class. */
- private BigtableIO() {}
-
- static class BigtableSource extends BoundedSource<Row> {
- public BigtableSource(
- BigtableService service,
- String tableId,
- @Nullable RowFilter filter,
- ByteKeyRange range,
- Long estimatedSizeBytes) {
- this.service = service;
- this.tableId = tableId;
- this.filter = filter;
- this.range = range;
- this.estimatedSizeBytes = estimatedSizeBytes;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(BigtableSource.class)
- .add("tableId", tableId)
- .add("filter", filter)
- .add("range", range)
- .add("estimatedSizeBytes", estimatedSizeBytes)
- .toString();
- }
-
- ////// Private state and internal implementation details //////
- private final BigtableService service;
- @Nullable private final String tableId;
- @Nullable private final RowFilter filter;
- private final ByteKeyRange range;
- @Nullable private Long estimatedSizeBytes;
- @Nullable private transient List<SampleRowKeysResponse> sampleRowKeys;
-
- protected BigtableSource withStartKey(ByteKey startKey) {
- checkNotNull(startKey, "startKey");
- return new BigtableSource(
- service, tableId, filter, range.withStartKey(startKey), estimatedSizeBytes);
- }
-
- protected BigtableSource withEndKey(ByteKey endKey) {
- checkNotNull(endKey, "endKey");
- return new BigtableSource(
- service, tableId, filter, range.withEndKey(endKey), estimatedSizeBytes);
- }
-
- protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {
- checkNotNull(estimatedSizeBytes, "estimatedSizeBytes");
- return new BigtableSource(service, tableId, filter, range, estimatedSizeBytes);
- }
-
- /**
- * Makes an API call to the Cloud Bigtable service that gives information about tablet key
- * boundaries and estimated sizes. We can use these samples to ensure that splits are on
- * different tablets, and possibly generate sub-splits within tablets.
- */
- private List<SampleRowKeysResponse> getSampleRowKeys() throws IOException {
- return service.getSampleRowKeys(this);
- }
-
- @Override
- public List<BigtableSource> splitIntoBundles(
- long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- // Update the desiredBundleSizeBytes in order to limit the
- // number of splits to maximumNumberOfSplits.
- long maximumNumberOfSplits = 4000;
- long sizeEstimate = getEstimatedSizeBytes(options);
- desiredBundleSizeBytes =
- Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes);
-
- // Delegate to testable helper.
- return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys());
- }
-
- /** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */
- private List<BigtableSource> splitIntoBundlesBasedOnSamples(
- long desiredBundleSizeBytes, List<SampleRowKeysResponse> sampleRowKeys) {
- // There are no regions, or no samples available. Just scan the entire range.
- if (sampleRowKeys.isEmpty()) {
- logger.info("Not splitting source {} because no sample row keys are available.", this);
- return Collections.singletonList(this);
- }
-
- logger.info(
- "About to split into bundles of size {} with sampleRowKeys length {} first element {}",
- desiredBundleSizeBytes,
- sampleRowKeys.size(),
- sampleRowKeys.get(0));
-
- // Loop through all sampled responses and generate splits from the ones that overlap the
- // scan range. The main complication is that we must track the end range of the previous
- // sample to generate good ranges.
- ByteKey lastEndKey = ByteKey.EMPTY;
- long lastOffset = 0;
- ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
- for (SampleRowKeysResponse response : sampleRowKeys) {
- ByteKey responseEndKey = ByteKey.of(response.getRowKey());
- long responseOffset = response.getOffsetBytes();
- checkState(
- responseOffset >= lastOffset,
- "Expected response byte offset %s to come after the last offset %s",
- responseOffset,
- lastOffset);
-
- if (!range.overlaps(ByteKeyRange.of(lastEndKey, responseEndKey))) {
- // This region does not overlap the scan, so skip it.
- lastOffset = responseOffset;
- lastEndKey = responseEndKey;
- continue;
- }
-
- // Calculate the beginning of the split as the larger of startKey and the end of the last
- // split. Unspecified start is smallest key so is correctly treated as earliest key.
- ByteKey splitStartKey = lastEndKey;
- if (splitStartKey.compareTo(range.getStartKey()) < 0) {
- splitStartKey = range.getStartKey();
- }
-
- // Calculate the end of the split as the smaller of endKey and the end of this sample. Note
- // that range.containsKey handles the case when range.getEndKey() is empty.
- ByteKey splitEndKey = responseEndKey;
- if (!range.containsKey(splitEndKey)) {
- splitEndKey = range.getEndKey();
- }
-
- // We know this region overlaps the desired key range, and we know a rough estimate of its
- // size. Split the key range into bundle-sized chunks and then add them all as splits.
- long sampleSizeBytes = responseOffset - lastOffset;
- List<BigtableSource> subSplits =
- splitKeyRangeIntoBundleSizedSubranges(
- sampleSizeBytes,
- desiredBundleSizeBytes,
- ByteKeyRange.of(splitStartKey, splitEndKey));
- splits.addAll(subSplits);
-
- // Move to the next region.
- lastEndKey = responseEndKey;
- lastOffset = responseOffset;
- }
-
- // We must add one more region after the end of the samples if both these conditions hold:
- // 1. we did not scan to the end yet (lastEndKey is concrete, not 0-length).
- // 2. we want to scan to the end (endKey is empty) or farther (lastEndKey < endKey).
- if (!lastEndKey.isEmpty()
- && (range.getEndKey().isEmpty() || lastEndKey.compareTo(range.getEndKey()) < 0)) {
- splits.add(this.withStartKey(lastEndKey).withEndKey(range.getEndKey()));
- }
-
- List<BigtableSource> ret = splits.build();
- logger.info("Generated {} splits. First split: {}", ret.size(), ret.get(0));
- return ret;
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
- // Delegate to testable helper.
- if (estimatedSizeBytes == null) {
- estimatedSizeBytes = getEstimatedSizeBytesBasedOnSamples(getSampleRowKeys());
- }
- return estimatedSizeBytes;
- }
-
- /**
- * Computes the estimated size in bytes based on the total size of all samples that overlap
- * the key range this source will scan.
- */
- private long getEstimatedSizeBytesBasedOnSamples(List<SampleRowKeysResponse> samples) {
- long estimatedSizeBytes = 0;
- long lastOffset = 0;
- ByteKey currentStartKey = ByteKey.EMPTY;
- // Compute the total estimated size as the size of each sample that overlaps the scan range.
- // TODO: In future, Bigtable service may provide finer grained APIs, e.g., to sample given a
- // filter or to sample on a given key range.
- for (SampleRowKeysResponse response : samples) {
- ByteKey currentEndKey = ByteKey.of(response.getRowKey());
- long currentOffset = response.getOffsetBytes();
- if (!currentStartKey.isEmpty() && currentStartKey.equals(currentEndKey)) {
- // Skip an empty region.
- lastOffset = currentOffset;
- continue;
- } else if (range.overlaps(ByteKeyRange.of(currentStartKey, currentEndKey))) {
- estimatedSizeBytes += currentOffset - lastOffset;
- }
- currentStartKey = currentEndKey;
- lastOffset = currentOffset;
- }
- return estimatedSizeBytes;
- }
-
- /**
- * Cloud Bigtable returns query results ordered by key.
- */
- @Override
- public boolean producesSortedKeys(PipelineOptions options) throws Exception {
- return true;
- }
-
- @Override
- public BoundedReader<Row> createReader(PipelineOptions options) throws IOException {
- return new BigtableReader(this, service);
- }
-
- @Override
- public void validate() {
- checkArgument(!tableId.isEmpty(), "tableId cannot be empty");
- }
-
- @Override
- public Coder<Row> getDefaultOutputCoder() {
- return ProtoCoder.of(Row.class);
- }
-
- /** Helper that splits the specified range in this source into bundles. */
- private List<BigtableSource> splitKeyRangeIntoBundleSizedSubranges(
- long sampleSizeBytes, long desiredBundleSizeBytes, ByteKeyRange range) {
- // Catch the trivial cases. Split is small enough already, or this is the last region.
- logger.debug(
- "Subsplit for sampleSizeBytes {} and desiredBundleSizeBytes {}",
- sampleSizeBytes,
- desiredBundleSizeBytes);
- if (sampleSizeBytes <= desiredBundleSizeBytes) {
- return Collections.singletonList(
- this.withStartKey(range.getStartKey()).withEndKey(range.getEndKey()));
- }
-
- checkArgument(
- sampleSizeBytes > 0, "Sample size %s bytes must be greater than 0.", sampleSizeBytes);
- checkArgument(
- desiredBundleSizeBytes > 0,
- "Desired bundle size %s bytes must be greater than 0.",
- desiredBundleSizeBytes);
-
- int splitCount = (int) Math.ceil(((double) sampleSizeBytes) / (desiredBundleSizeBytes));
- List<ByteKey> splitKeys = range.split(splitCount);
- ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
- Iterator<ByteKey> keys = splitKeys.iterator();
- ByteKey prev = keys.next();
- while (keys.hasNext()) {
- ByteKey next = keys.next();
- splits.add(
- this
- .withStartKey(prev)
- .withEndKey(next)
- .withEstimatedSizeBytes(sampleSizeBytes / splitCount));
- prev = next;
- }
- return splits.build();
- }
-
- public ByteKeyRange getRange() {
- return range;
- }
-
- public RowFilter getRowFilter() {
- return filter;
- }
-
- public String getTableId() {
- return tableId;
- }
- }
-
- private static class BigtableReader extends BoundedReader<Row> {
- // Thread-safety: source is protected via synchronization and is only accessed or modified
- // inside a synchronized block (or constructor, which is the same).
- private BigtableSource source;
- private BigtableService service;
- private BigtableService.Reader reader;
- private final ByteKeyRangeTracker rangeTracker;
- private long recordsReturned;
-
- public BigtableReader(BigtableSource source, BigtableService service) {
- this.source = source;
- this.service = service;
- rangeTracker = ByteKeyRangeTracker.of(source.getRange());
- }
-
- @Override
- public boolean start() throws IOException {
- reader = service.createReader(getCurrentSource());
- boolean hasRecord =
- reader.start()
- && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
- if (hasRecord) {
- ++recordsReturned;
- }
- return hasRecord;
- }
-
- @Override
- public synchronized BigtableSource getCurrentSource() {
- return source;
- }
-
- @Override
- public boolean advance() throws IOException {
- boolean hasRecord =
- reader.advance()
- && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
- if (hasRecord) {
- ++recordsReturned;
- }
- return hasRecord;
- }
-
- @Override
- public Row getCurrent() throws NoSuchElementException {
- return reader.getCurrentRow();
- }
-
- @Override
- public void close() throws IOException {
- logger.info("Closing reader after reading {} records.", recordsReturned);
- if (reader != null) {
- reader.close();
- reader = null;
- }
- }
-
- @Override
- public final Double getFractionConsumed() {
- return rangeTracker.getFractionConsumed();
- }
-
- @Override
- public final synchronized BigtableSource splitAtFraction(double fraction) {
- ByteKey splitKey;
- try {
- splitKey = source.getRange().interpolateKey(fraction);
- } catch (IllegalArgumentException e) {
- logger.info("%s: Failed to interpolate key for fraction %s.", source.getRange(), fraction);
- return null;
- }
- logger.debug(
- "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey);
- if (!rangeTracker.trySplitAtPosition(splitKey)) {
- return null;
- }
- BigtableSource primary = source.withEndKey(splitKey);
- BigtableSource residual = source.withStartKey(splitKey);
- this.source = primary;
- return residual;
- }
- }
-
- private static class Sink
- extends org.apache.beam.sdk.io.Sink<KV<ByteString, Iterable<Mutation>>> {
-
- public Sink(String tableId, BigtableService bigtableService) {
- this.tableId = checkNotNull(tableId, "tableId");
- this.bigtableService = checkNotNull(bigtableService, "bigtableService");
- }
-
- public String getTableId() {
- return tableId;
- }
-
- public BigtableService getBigtableService() {
- return bigtableService;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(Sink.class)
- .add("bigtableService", bigtableService)
- .add("tableId", tableId)
- .toString();
- }
-
- ///////////////////////////////////////////////////////////////////////////////
- private final String tableId;
- private final BigtableService bigtableService;
-
- @Override
- public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> createWriteOperation(
- PipelineOptions options) {
- return new BigtableWriteOperation(this);
- }
-
- /** Does nothing, as it is redundant with {@link Write#validate}. */
- @Override
- public void validate(PipelineOptions options) {}
- }
-
- private static class BigtableWriteOperation
- extends WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> {
- private final Sink sink;
-
- public BigtableWriteOperation(Sink sink) {
- this.sink = sink;
- }
-
- @Override
- public Writer<KV<ByteString, Iterable<Mutation>>, Long> createWriter(PipelineOptions options)
- throws Exception {
- return new BigtableWriter(this);
- }
-
- @Override
- public void initialize(PipelineOptions options) {}
-
- @Override
- public void finalize(Iterable<Long> writerResults, PipelineOptions options) {
- long count = 0;
- for (Long value : writerResults) {
- value += count;
- }
- logger.debug("Wrote {} elements to BigtableIO.Sink {}", sink);
- }
-
- @Override
- public Sink getSink() {
- return sink;
- }
-
- @Override
- public Coder<Long> getWriterResultCoder() {
- return VarLongCoder.of();
- }
- }
-
- private static class BigtableWriter extends Writer<KV<ByteString, Iterable<Mutation>>, Long> {
- private final BigtableWriteOperation writeOperation;
- private final Sink sink;
- private BigtableService.Writer bigtableWriter;
- private long recordsWritten;
- private final ConcurrentLinkedQueue<BigtableWriteException> failures;
-
- public BigtableWriter(BigtableWriteOperation writeOperation) {
- this.writeOperation = writeOperation;
- this.sink = writeOperation.getSink();
- this.failures = new ConcurrentLinkedQueue<>();
- }
-
- @Override
- public void open(String uId) throws Exception {
- bigtableWriter = sink.getBigtableService().openForWriting(sink.getTableId());
- recordsWritten = 0;
- }
-
- /**
- * If any write has asynchronously failed, fail the bundle with a useful error.
- */
- private void checkForFailures() throws IOException {
- // Note that this function is never called by multiple threads and is the only place that
- // we remove from failures, so this code is safe.
- if (failures.isEmpty()) {
- return;
- }
-
- StringBuilder logEntry = new StringBuilder();
- int i = 0;
- for (; i < 10 && !failures.isEmpty(); ++i) {
- BigtableWriteException exc = failures.remove();
- logEntry.append("\n").append(exc.getMessage());
- if (exc.getCause() != null) {
- logEntry.append(": ").append(exc.getCause().getMessage());
- }
- }
- String message =
- String.format(
- "At least %d errors occurred writing to Bigtable. First %d errors: %s",
- i + failures.size(),
- i,
- logEntry.toString());
- logger.error(message);
- throw new IOException(message);
- }
-
- @Override
- public void write(KV<ByteString, Iterable<Mutation>> rowMutations) throws Exception {
- checkForFailures();
- Futures.addCallback(
- bigtableWriter.writeRecord(rowMutations), new WriteExceptionCallback(rowMutations));
- ++recordsWritten;
- }
-
- @Override
- public Long close() throws Exception {
- bigtableWriter.close();
- bigtableWriter = null;
- checkForFailures();
- logger.info("Wrote {} records", recordsWritten);
- return recordsWritten;
- }
-
- @Override
- public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> getWriteOperation() {
- return writeOperation;
- }
-
- private class WriteExceptionCallback implements FutureCallback<Empty> {
- private final KV<ByteString, Iterable<Mutation>> value;
-
- public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) {
- this.value = value;
- }
-
- @Override
- public void onFailure(Throwable cause) {
- failures.add(new BigtableWriteException(value, cause));
- }
-
- @Override
- public void onSuccess(Empty produced) {}
- }
- }
-
- /**
- * An exception that puts information about the failed record being written in its message.
- */
- static class BigtableWriteException extends IOException {
- public BigtableWriteException(KV<ByteString, Iterable<Mutation>> record, Throwable cause) {
- super(
- String.format(
- "Error mutating row %s with mutations %s",
- record.getKey().toStringUtf8(),
- record.getValue()),
- cause);
- }
- }
-
- /**
- * A helper function to produce a Cloud Bigtable user agent string.
- */
- private static String getUserAgent() {
- String javaVersion = System.getProperty("java.specification.version");
- ReleaseInfo info = ReleaseInfo.getReleaseInfo();
- return String.format(
- "%s/%s (%s); %s",
- info.getName(),
- info.getVersion(),
- javaVersion,
- "0.2.3" /* TODO get Bigtable client version directly from jar. */);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableService.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableService.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableService.java
deleted file mode 100644
index cfae0ef..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableService.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.bigtable;
-
-import org.apache.beam.sdk.io.bigtable.BigtableIO.BigtableSource;
-import org.apache.beam.sdk.values.KV;
-
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * An interface for real or fake implementations of Cloud Bigtable.
- */
-interface BigtableService extends Serializable {
-
- /**
- * The interface of a class that can write to Cloud Bigtable.
- */
- interface Writer {
- /**
- * Writes a single row transaction to Cloud Bigtable. The key of the {@code record} is the
- * row key to be mutated and the iterable of mutations represent the changes to be made to the
- * row.
- *
- * @throws IOException if there is an error submitting the write.
- */
- ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
- throws IOException;
-
- /**
- * Closes the writer.
- *
- * @throws IOException if any writes did not succeed
- */
- void close() throws IOException;
- }
-
- /**
- * The interface of a class that reads from Cloud Bigtable.
- */
- interface Reader {
- /**
- * Reads the first element (including initialization, such as opening a network connection) and
- * returns true if an element was found.
- */
- boolean start() throws IOException;
-
- /**
- * Attempts to read the next element, and returns true if an element has been read.
- */
- boolean advance() throws IOException;
-
- /**
- * Closes the reader.
- *
- * @throws IOException if there is an error.
- */
- void close() throws IOException;
-
- /**
- * Returns the last row read by a successful start() or advance(), or throws if there is no
- * current row because the last such call was unsuccessful.
- */
- Row getCurrentRow() throws NoSuchElementException;
- }
-
- /**
- * Returns {@code true} if the table with the give name exists.
- */
- boolean tableExists(String tableId) throws IOException;
-
- /**
- * Returns a {@link Reader} that will read from the specified source.
- */
- Reader createReader(BigtableSource source) throws IOException;
-
- /**
- * Returns a {@link Writer} that will write to the specified table.
- */
- Writer openForWriting(String tableId) throws IOException;
-
- /**
- * Returns a set of row keys sampled from the underlying table. These contain information about
- * the distribution of keys within the table.
- */
- List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableServiceImpl.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableServiceImpl.java
deleted file mode 100644
index 87651f2..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableServiceImpl.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.bigtable;
-
-import org.apache.beam.sdk.io.bigtable.BigtableIO.BigtableSource;
-import org.apache.beam.sdk.values.KV;
-
-import com.google.bigtable.admin.table.v1.GetTableRequest;
-import com.google.bigtable.v1.MutateRowRequest;
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.ReadRowsRequest;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowRange;
-import com.google.bigtable.v1.SampleRowKeysRequest;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.cloud.bigtable.grpc.BigtableSession;
-import com.google.cloud.bigtable.grpc.async.AsyncExecutor;
-import com.google.cloud.bigtable.grpc.async.HeapSizeManager;
-import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
-import com.google.common.base.MoreObjects;
-import com.google.common.io.Closer;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import io.grpc.Status.Code;
-import io.grpc.StatusRuntimeException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * An implementation of {@link BigtableService} that actually communicates with the Cloud Bigtable
- * service.
- */
-class BigtableServiceImpl implements BigtableService {
- private static final Logger logger = LoggerFactory.getLogger(BigtableService.class);
-
- public BigtableServiceImpl(BigtableOptions options) {
- this.options = options;
- }
-
- private final BigtableOptions options;
-
- @Override
- public BigtableWriterImpl openForWriting(String tableId) throws IOException {
- BigtableSession session = new BigtableSession(options);
- String tableName = options.getClusterName().toTableNameStr(tableId);
- return new BigtableWriterImpl(session, tableName);
- }
-
- @Override
- public boolean tableExists(String tableId) throws IOException {
- if (!BigtableSession.isAlpnProviderEnabled()) {
- logger.info(
- "Skipping existence check for table {} (BigtableOptions {}) because ALPN is not"
- + " configured.",
- tableId,
- options);
- return true;
- }
-
- try (BigtableSession session = new BigtableSession(options)) {
- GetTableRequest getTable =
- GetTableRequest.newBuilder()
- .setName(options.getClusterName().toTableNameStr(tableId))
- .build();
- session.getTableAdminClient().getTable(getTable);
- return true;
- } catch (StatusRuntimeException e) {
- if (e.getStatus().getCode() == Code.NOT_FOUND) {
- return false;
- }
- String message =
- String.format(
- "Error checking whether table %s (BigtableOptions %s) exists", tableId, options);
- logger.error(message, e);
- throw new IOException(message, e);
- }
- }
-
- private class BigtableReaderImpl implements Reader {
- private BigtableSession session;
- private final BigtableSource source;
- private ResultScanner<Row> results;
- private Row currentRow;
-
- public BigtableReaderImpl(BigtableSession session, BigtableSource source) {
- this.session = session;
- this.source = source;
- }
-
- @Override
- public boolean start() throws IOException {
- RowRange range =
- RowRange.newBuilder()
- .setStartKey(source.getRange().getStartKey().getValue())
- .setEndKey(source.getRange().getEndKey().getValue())
- .build();
- ReadRowsRequest.Builder requestB =
- ReadRowsRequest.newBuilder()
- .setRowRange(range)
- .setTableName(options.getClusterName().toTableNameStr(source.getTableId()));
- if (source.getRowFilter() != null) {
- requestB.setFilter(source.getRowFilter());
- }
- results = session.getDataClient().readRows(requestB.build());
- return advance();
- }
-
- @Override
- public boolean advance() throws IOException {
- currentRow = results.next();
- return (currentRow != null);
- }
-
- @Override
- public void close() throws IOException {
- // Goal: by the end of this function, both results and session are null and closed,
- // independent of what errors they throw or prior state.
-
- if (session == null) {
- // Only possible when previously closed, so we know that results is also null.
- return;
- }
-
- // Session does not implement Closeable -- it's AutoCloseable. So we can't register it with
- // the Closer, but we can use the Closer to simplify the error handling.
- try (Closer closer = Closer.create()) {
- if (results != null) {
- closer.register(results);
- results = null;
- }
-
- session.close();
- } finally {
- session = null;
- }
- }
-
- @Override
- public Row getCurrentRow() throws NoSuchElementException {
- if (currentRow == null) {
- throw new NoSuchElementException();
- }
- return currentRow;
- }
- }
-
- private static class BigtableWriterImpl implements Writer {
- private BigtableSession session;
- private AsyncExecutor executor;
- private final MutateRowRequest.Builder partialBuilder;
-
- public BigtableWriterImpl(BigtableSession session, String tableName) {
- this.session = session;
- this.executor =
- new AsyncExecutor(
- session.getDataClient(),
- new HeapSizeManager(
- AsyncExecutor.ASYNC_MUTATOR_MAX_MEMORY_DEFAULT,
- AsyncExecutor.MAX_INFLIGHT_RPCS_DEFAULT));
-
- partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName);
- }
-
- @Override
- public void close() throws IOException {
- try {
- if (executor != null) {
- executor.flush();
- executor = null;
- }
- } finally {
- if (session != null) {
- session.close();
- session = null;
- }
- }
- }
-
- @Override
- public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
- throws IOException {
- MutateRowRequest r =
- partialBuilder
- .clone()
- .setRowKey(record.getKey())
- .addAllMutations(record.getValue())
- .build();
- try {
- return executor.mutateRowAsync(r);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Write interrupted", e);
- }
- }
- }
-
- @Override
- public String toString() {
- return MoreObjects
- .toStringHelper(BigtableServiceImpl.class)
- .add("options", options)
- .toString();
- }
-
- @Override
- public Reader createReader(BigtableSource source) throws IOException {
- BigtableSession session = new BigtableSession(options);
- return new BigtableReaderImpl(session, source);
- }
-
- @Override
- public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException {
- try (BigtableSession session = new BigtableSession(options)) {
- SampleRowKeysRequest request =
- SampleRowKeysRequest.newBuilder()
- .setTableName(options.getClusterName().toTableNameStr(source.getTableId()))
- .build();
- return session.getDataClient().sampleRowKeys(request);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/package-info.java
deleted file mode 100644
index f094cd4..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines transforms for reading and writing from Google Cloud Bigtable.
- *
- * @see org.apache.beam.sdk.io.bigtable.BigtableIO
- */
-package org.apache.beam.sdk.io.bigtable;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java
deleted file mode 100644
index 7c176e4..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java
+++ /dev/null
@@ -1,729 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.bigtable;
-
-import static org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource;
-import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
-import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
-import static org.apache.beam.sdk.testing.SourceTestUtils
- .assertSplitAtFractionSucceedsAndConsistent;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Verify.verifyNotNull;
-
-import static org.hamcrest.Matchers.hasSize;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.bigtable.BigtableIO.BigtableSource;
-import org.apache.beam.sdk.io.range.ByteKey;
-import org.apache.beam.sdk.io.range.ByteKeyRange;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import com.google.bigtable.v1.Cell;
-import com.google.bigtable.v1.Column;
-import com.google.bigtable.v1.Family;
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Mutation.SetCell;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowFilter;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import javax.annotation.Nullable;
-
-/**
- * Unit tests for {@link BigtableIO}.
- */
-@RunWith(JUnit4.class)
-public class BigtableIOTest {
- @Rule public ExpectedException thrown = ExpectedException.none();
- @Rule public ExpectedLogs logged = ExpectedLogs.none(BigtableIO.class);
-
- /**
- * These tests requires a static instance of the {@link FakeBigtableService} because the writers
- * go through a serialization step when executing the test and would not affect passed-in objects
- * otherwise.
- */
- private static FakeBigtableService service;
- private static final BigtableOptions BIGTABLE_OPTIONS =
- new BigtableOptions.Builder()
- .setProjectId("project")
- .setClusterId("cluster")
- .setZoneId("zone")
- .build();
- private static BigtableIO.Read defaultRead =
- BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS);
- private static BigtableIO.Write defaultWrite =
- BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS);
- private Coder<KV<ByteString, Iterable<Mutation>>> bigtableCoder;
- private static final TypeDescriptor<KV<ByteString, Iterable<Mutation>>> BIGTABLE_WRITE_TYPE =
- new TypeDescriptor<KV<ByteString, Iterable<Mutation>>>() {};
-
- @Before
- public void setup() throws Exception {
- service = new FakeBigtableService();
- defaultRead = defaultRead.withBigtableService(service);
- defaultWrite = defaultWrite.withBigtableService(service);
- bigtableCoder = TestPipeline.create().getCoderRegistry().getCoder(BIGTABLE_WRITE_TYPE);
- }
-
- @Test
- public void testReadBuildsCorrectly() {
- BigtableIO.Read read =
- BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS).withTableId("table");
- assertEquals("project", read.getBigtableOptions().getProjectId());
- assertEquals("cluster", read.getBigtableOptions().getClusterId());
- assertEquals("zone", read.getBigtableOptions().getZoneId());
- assertEquals("table", read.getTableId());
- }
-
- @Test
- public void testReadBuildsCorrectlyInDifferentOrder() {
- BigtableIO.Read read =
- BigtableIO.read().withTableId("table").withBigtableOptions(BIGTABLE_OPTIONS);
- assertEquals("project", read.getBigtableOptions().getProjectId());
- assertEquals("cluster", read.getBigtableOptions().getClusterId());
- assertEquals("zone", read.getBigtableOptions().getZoneId());
- assertEquals("table", read.getTableId());
- }
-
- @Test
- public void testWriteBuildsCorrectly() {
- BigtableIO.Write write =
- BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS).withTableId("table");
- assertEquals("table", write.getTableId());
- assertEquals("project", write.getBigtableOptions().getProjectId());
- assertEquals("zone", write.getBigtableOptions().getZoneId());
- assertEquals("cluster", write.getBigtableOptions().getClusterId());
- }
-
- @Test
- public void testWriteBuildsCorrectlyInDifferentOrder() {
- BigtableIO.Write write =
- BigtableIO.write().withTableId("table").withBigtableOptions(BIGTABLE_OPTIONS);
- assertEquals("cluster", write.getBigtableOptions().getClusterId());
- assertEquals("project", write.getBigtableOptions().getProjectId());
- assertEquals("zone", write.getBigtableOptions().getZoneId());
- assertEquals("table", write.getTableId());
- }
-
- @Test
- public void testWriteValidationFailsMissingTable() {
- BigtableIO.Write write = BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS);
-
- thrown.expect(IllegalArgumentException.class);
-
- write.validate(null);
- }
-
- @Test
- public void testWriteValidationFailsMissingOptions() {
- BigtableIO.Write write = BigtableIO.write().withTableId("table");
-
- thrown.expect(IllegalArgumentException.class);
-
- write.validate(null);
- }
-
- /** Helper function to make a single row mutation to be written. */
- private static KV<ByteString, Iterable<Mutation>> makeWrite(String key, String value) {
- ByteString rowKey = ByteString.copyFromUtf8(key);
- Iterable<Mutation> mutations =
- ImmutableList.of(
- Mutation.newBuilder()
- .setSetCell(SetCell.newBuilder().setValue(ByteString.copyFromUtf8(value)))
- .build());
- return KV.of(rowKey, mutations);
- }
-
- /** Helper function to make a single bad row mutation (no set cell). */
- private static KV<ByteString, Iterable<Mutation>> makeBadWrite(String key) {
- Iterable<Mutation> mutations = ImmutableList.of(Mutation.newBuilder().build());
- return KV.of(ByteString.copyFromUtf8(key), mutations);
- }
-
- /** Tests that when reading from a non-existent table, the read fails. */
- @Test
- public void testReadingFailsTableDoesNotExist() throws Exception {
- final String table = "TEST-TABLE";
-
- BigtableIO.Read read =
- BigtableIO.read()
- .withBigtableOptions(BIGTABLE_OPTIONS)
- .withTableId(table)
- .withBigtableService(service);
-
- // Exception will be thrown by read.validate() when read is applied.
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(String.format("Table %s does not exist", table));
-
- TestPipeline.create().apply(read);
- }
-
- /** Tests that when reading from an empty table, the read succeeds. */
- @Test
- public void testReadingEmptyTable() throws Exception {
- final String table = "TEST-EMPTY-TABLE";
- service.createTable(table);
-
- TestPipeline p = TestPipeline.create();
- PCollection<Row> rows = p.apply(defaultRead.withTableId(table));
- PAssert.that(rows).empty();
-
- p.run();
- logged.verifyInfo(String.format("Closing reader after reading 0 records."));
- }
-
- /** Tests reading all rows from a table. */
- @Test
- public void testReading() throws Exception {
- final String table = "TEST-MANY-ROWS-TABLE";
- final int numRows = 1001;
- List<Row> testRows = makeTableData(table, numRows);
-
- TestPipeline p = TestPipeline.create();
- PCollection<Row> rows = p.apply(defaultRead.withTableId(table));
- PAssert.that(rows).containsInAnyOrder(testRows);
-
- p.run();
- logged.verifyInfo(String.format("Closing reader after reading %d records.", numRows));
- }
-
- /** A {@link Predicate} that a {@link Row Row's} key matches the given regex. */
- private static class KeyMatchesRegex implements Predicate<ByteString> {
- private final String regex;
-
- public KeyMatchesRegex(String regex) {
- this.regex = regex;
- }
-
- @Override
- public boolean apply(@Nullable ByteString input) {
- verifyNotNull(input, "input");
- return input.toStringUtf8().matches(regex);
- }
- }
-
- /** Tests reading all rows using a filter. */
- @Test
- public void testReadingWithFilter() throws Exception {
- final String table = "TEST-FILTER-TABLE";
- final int numRows = 1001;
- List<Row> testRows = makeTableData(table, numRows);
- String regex = ".*17.*";
- final KeyMatchesRegex keyPredicate = new KeyMatchesRegex(regex);
- Iterable<Row> filteredRows =
- Iterables.filter(
- testRows,
- new Predicate<Row>() {
- @Override
- public boolean apply(@Nullable Row input) {
- verifyNotNull(input, "input");
- return keyPredicate.apply(input.getKey());
- }
- });
-
- RowFilter filter =
- RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(regex)).build();
-
- TestPipeline p = TestPipeline.create();
- PCollection<Row> rows = p.apply(defaultRead.withTableId(table).withRowFilter(filter));
- PAssert.that(rows).containsInAnyOrder(filteredRows);
-
- p.run();
- }
-
- /**
- * Tests dynamic work rebalancing exhaustively.
- *
- * <p>Because this test runs so slowly, it is disabled by default. Re-run when changing the
- * {@link BigtableIO.Read} implementation.
- */
- @Ignore("Slow. Rerun when changing the implementation.")
- @Test
- public void testReadingSplitAtFractionExhaustive() throws Exception {
- final String table = "TEST-FEW-ROWS-SPLIT-EXHAUSTIVE-TABLE";
- final int numRows = 10;
- final int numSamples = 1;
- final long bytesPerRow = 1L;
- makeTableData(table, numRows);
- service.setupSampleRowKeys(table, numSamples, bytesPerRow);
-
- BigtableSource source =
- new BigtableSource(service, table, null, service.getTableRange(table), null);
- assertSplitAtFractionExhaustive(source, null);
- }
-
- /**
- * Unit tests of splitAtFraction.
- */
- @Test
- public void testReadingSplitAtFraction() throws Exception {
- final String table = "TEST-SPLIT-AT-FRACTION";
- final int numRows = 10;
- final int numSamples = 1;
- final long bytesPerRow = 1L;
- makeTableData(table, numRows);
- service.setupSampleRowKeys(table, numSamples, bytesPerRow);
-
- BigtableSource source =
- new BigtableSource(service, table, null, service.getTableRange(table), null);
- // With 0 items read, all split requests will fail.
- assertSplitAtFractionFails(source, 0, 0.1, null /* options */);
- assertSplitAtFractionFails(source, 0, 1.0, null /* options */);
- // With 1 items read, all split requests past 1/10th will succeed.
- assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.333, null /* options */);
- assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.666, null /* options */);
- // With 3 items read, all split requests past 3/10ths will succeed.
- assertSplitAtFractionFails(source, 3, 0.2, null /* options */);
- assertSplitAtFractionSucceedsAndConsistent(source, 3, 0.571, null /* options */);
- assertSplitAtFractionSucceedsAndConsistent(source, 3, 0.9, null /* options */);
- // With 6 items read, all split requests past 6/10ths will succeed.
- assertSplitAtFractionFails(source, 6, 0.5, null /* options */);
- assertSplitAtFractionSucceedsAndConsistent(source, 6, 0.7, null /* options */);
- }
-
- /** Tests reading all rows from a split table. */
- @Test
- public void testReadingWithSplits() throws Exception {
- final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
- final int numRows = 1500;
- final int numSamples = 10;
- final long bytesPerRow = 100L;
-
- // Set up test table data and sample row keys for size estimation and splitting.
- makeTableData(table, numRows);
- service.setupSampleRowKeys(table, numSamples, bytesPerRow);
-
- // Generate source and split it.
- BigtableSource source =
- new BigtableSource(service, table, null /*filter*/, ByteKeyRange.ALL_KEYS, null /*size*/);
- List<BigtableSource> splits =
- source.splitIntoBundles(numRows * bytesPerRow / numSamples, null /* options */);
-
- // Test num splits and split equality.
- assertThat(splits, hasSize(numSamples));
- assertSourcesEqualReferenceSource(source, splits, null /* options */);
- }
-
- /** Tests reading all rows from a sub-split table. */
- @Test
- public void testReadingWithSubSplits() throws Exception {
- final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
- final int numRows = 1000;
- final int numSamples = 10;
- final int numSplits = 20;
- final long bytesPerRow = 100L;
-
- // Set up test table data and sample row keys for size estimation and splitting.
- makeTableData(table, numRows);
- service.setupSampleRowKeys(table, numSamples, bytesPerRow);
-
- // Generate source and split it.
- BigtableSource source =
- new BigtableSource(service, table, null /*filter*/, ByteKeyRange.ALL_KEYS, null /*size*/);
- List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
-
- // Test num splits and split equality.
- assertThat(splits, hasSize(numSplits));
- assertSourcesEqualReferenceSource(source, splits, null /* options */);
- }
-
- /** Tests reading all rows from a sub-split table. */
- @Test
- public void testReadingWithFilterAndSubSplits() throws Exception {
- final String table = "TEST-FILTER-SUB-SPLITS";
- final int numRows = 1700;
- final int numSamples = 10;
- final int numSplits = 20;
- final long bytesPerRow = 100L;
-
- // Set up test table data and sample row keys for size estimation and splitting.
- makeTableData(table, numRows);
- service.setupSampleRowKeys(table, numSamples, bytesPerRow);
-
- // Generate source and split it.
- RowFilter filter =
- RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build();
- BigtableSource source =
- new BigtableSource(service, table, filter, ByteKeyRange.ALL_KEYS, null /*size*/);
- List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
-
- // Test num splits and split equality.
- assertThat(splits, hasSize(numSplits));
- assertSourcesEqualReferenceSource(source, splits, null /* options */);
- }
-
- @Test
- public void testReadingDisplayData() {
- RowFilter rowFilter = RowFilter.newBuilder()
- .setRowKeyRegexFilter(ByteString.copyFromUtf8("foo.*"))
- .build();
-
- BigtableIO.Read read = BigtableIO.read()
- .withBigtableOptions(BIGTABLE_OPTIONS)
- .withTableId("fooTable")
- .withRowFilter(rowFilter);
-
- DisplayData displayData = DisplayData.from(read);
-
- assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
- assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString()));
-
- // BigtableIO adds user-agent to options; assert only on key and not value.
- assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions")));
- }
-
- /** Tests that a record gets written to the service and messages are logged. */
- @Test
- public void testWriting() throws Exception {
- final String table = "table";
- final String key = "key";
- final String value = "value";
-
- service.createTable(table);
-
- TestPipeline p = TestPipeline.create();
- p.apply("single row", Create.of(makeWrite(key, value)).withCoder(bigtableCoder))
- .apply("write", defaultWrite.withTableId(table));
- p.run();
-
- logged.verifyInfo("Wrote 1 records");
-
- assertEquals(1, service.tables.size());
- assertNotNull(service.getTable(table));
- Map<ByteString, ByteString> rows = service.getTable(table);
- assertEquals(1, rows.size());
- assertEquals(ByteString.copyFromUtf8(value), rows.get(ByteString.copyFromUtf8(key)));
- }
-
- /** Tests that when writing to a non-existent table, the write fails. */
- @Test
- public void testWritingFailsTableDoesNotExist() throws Exception {
- final String table = "TEST-TABLE";
-
- PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput =
- TestPipeline.create().apply(Create.<KV<ByteString, Iterable<Mutation>>>of());
-
- // Exception will be thrown by write.validate() when write is applied.
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(String.format("Table %s does not exist", table));
-
- emptyInput.apply("write", defaultWrite.withTableId(table));
- }
-
- /** Tests that when writing an element fails, the write fails. */
- @Test
- public void testWritingFailsBadElement() throws Exception {
- final String table = "TEST-TABLE";
- final String key = "KEY";
- service.createTable(table);
-
- TestPipeline p = TestPipeline.create();
- p.apply(Create.of(makeBadWrite(key)).withCoder(bigtableCoder))
- .apply(defaultWrite.withTableId(table));
-
- thrown.expect(PipelineExecutionException.class);
- thrown.expectCause(Matchers.<Throwable>instanceOf(IOException.class));
- thrown.expectMessage("At least 1 errors occurred writing to Bigtable. First 1 errors:");
- thrown.expectMessage("Error mutating row " + key + " with mutations []: cell value missing");
- p.run();
- }
-
- @Test
- public void testWritingDisplayData() {
- BigtableIO.Write write = BigtableIO.write()
- .withTableId("fooTable")
- .withBigtableOptions(BIGTABLE_OPTIONS);
-
- DisplayData displayData = DisplayData.from(write);
-
- assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
- // BigtableIO adds user-agent to options; assert only on key and not value.
- assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions")));
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////
- private static final String COLUMN_FAMILY_NAME = "family";
- private static final ByteString COLUMN_NAME = ByteString.copyFromUtf8("column");
- private static final Column TEST_COLUMN = Column.newBuilder().setQualifier(COLUMN_NAME).build();
- private static final Family TEST_FAMILY = Family.newBuilder().setName(COLUMN_FAMILY_NAME).build();
-
- /** Helper function that builds a {@link Row} in a test table that could be returned by read. */
- private static Row makeRow(ByteString key, ByteString value) {
- // Build the currentRow and return true.
- Column.Builder newColumn = TEST_COLUMN.toBuilder().addCells(Cell.newBuilder().setValue(value));
- return Row.newBuilder()
- .setKey(key)
- .addFamilies(TEST_FAMILY.toBuilder().addColumns(newColumn))
- .build();
- }
-
- /** Helper function to create a table and return the rows that it created. */
- private static List<Row> makeTableData(String tableId, int numRows) {
- service.createTable(tableId);
- Map<ByteString, ByteString> testData = service.getTable(tableId);
-
- List<Row> testRows = new ArrayList<>(numRows);
- for (int i = 0; i < numRows; ++i) {
- ByteString key = ByteString.copyFromUtf8(String.format("key%09d", i));
- ByteString value = ByteString.copyFromUtf8(String.format("value%09d", i));
- testData.put(key, value);
- testRows.add(makeRow(key, value));
- }
-
- return testRows;
- }
-
-
- /**
- * A {@link BigtableService} implementation that stores tables and their contents in memory.
- */
- private static class FakeBigtableService implements BigtableService {
- private final Map<String, SortedMap<ByteString, ByteString>> tables = new HashMap<>();
- private final Map<String, List<SampleRowKeysResponse>> sampleRowKeys = new HashMap<>();
-
- @Nullable
- public SortedMap<ByteString, ByteString> getTable(String tableId) {
- return tables.get(tableId);
- }
-
- public ByteKeyRange getTableRange(String tableId) {
- verifyTableExists(tableId);
- SortedMap<ByteString, ByteString> data = tables.get(tableId);
- return ByteKeyRange.of(ByteKey.of(data.firstKey()), ByteKey.of(data.lastKey()));
- }
-
- public void createTable(String tableId) {
- tables.put(tableId, new TreeMap<ByteString, ByteString>(new ByteStringComparator()));
- }
-
- @Override
- public boolean tableExists(String tableId) {
- return tables.containsKey(tableId);
- }
-
- public void verifyTableExists(String tableId) {
- checkArgument(tableExists(tableId), "Table %s does not exist", tableId);
- }
-
- @Override
- public FakeBigtableReader createReader(BigtableSource source) {
- return new FakeBigtableReader(source);
- }
-
- @Override
- public FakeBigtableWriter openForWriting(String tableId) {
- return new FakeBigtableWriter(tableId);
- }
-
- @Override
- public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) {
- List<SampleRowKeysResponse> samples = sampleRowKeys.get(source.getTableId());
- checkArgument(samples != null, "No samples found for table %s", source.getTableId());
- return samples;
- }
-
- /** Sets up the sample row keys for the specified table. */
- void setupSampleRowKeys(String tableId, int numSamples, long bytesPerRow) {
- verifyTableExists(tableId);
- checkArgument(numSamples > 0, "Number of samples must be positive: %s", numSamples);
- checkArgument(bytesPerRow > 0, "Bytes/Row must be positive: %s", bytesPerRow);
-
- ImmutableList.Builder<SampleRowKeysResponse> ret = ImmutableList.builder();
- SortedMap<ByteString, ByteString> rows = getTable(tableId);
- int currentSample = 1;
- int rowsSoFar = 0;
- for (Map.Entry<ByteString, ByteString> entry : rows.entrySet()) {
- if (((double) rowsSoFar) / rows.size() >= ((double) currentSample) / numSamples) {
- // add the sample with the total number of bytes in the table before this key.
- ret.add(
- SampleRowKeysResponse.newBuilder()
- .setRowKey(entry.getKey())
- .setOffsetBytes(rowsSoFar * bytesPerRow)
- .build());
- // Move on to next sample
- currentSample++;
- }
- ++rowsSoFar;
- }
-
- // Add the last sample indicating the end of the table, with all rows before it.
- ret.add(SampleRowKeysResponse.newBuilder().setOffsetBytes(rows.size() * bytesPerRow).build());
- sampleRowKeys.put(tableId, ret.build());
- }
- }
-
- /**
- * A {@link BigtableService.Reader} implementation that reads from the static instance of
- * {@link FakeBigtableService} stored in {@link #service}.
- *
- * <p>This reader does not support {@link RowFilter} objects.
- */
- private static class FakeBigtableReader implements BigtableService.Reader {
- private final BigtableSource source;
- private Iterator<Map.Entry<ByteString, ByteString>> rows;
- private Row currentRow;
- private Predicate<ByteString> filter;
-
- public FakeBigtableReader(BigtableSource source) {
- this.source = source;
- if (source.getRowFilter() == null) {
- filter = Predicates.alwaysTrue();
- } else {
- ByteString keyRegex = source.getRowFilter().getRowKeyRegexFilter();
- checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported");
- filter = new KeyMatchesRegex(keyRegex.toStringUtf8());
- }
- service.verifyTableExists(source.getTableId());
- }
-
- @Override
- public boolean start() {
- rows = service.tables.get(source.getTableId()).entrySet().iterator();
- return advance();
- }
-
- @Override
- public boolean advance() {
- // Loop until we find a row in range, or reach the end of the iterator.
- Map.Entry<ByteString, ByteString> entry = null;
- while (rows.hasNext()) {
- entry = rows.next();
- if (!filter.apply(entry.getKey())
- || !source.getRange().containsKey(ByteKey.of(entry.getKey()))) {
- // Does not match row filter or does not match source range. Skip.
- entry = null;
- continue;
- }
- // Found a row inside this source's key range, stop.
- break;
- }
-
- // Return false if no more rows.
- if (entry == null) {
- currentRow = null;
- return false;
- }
-
- // Set the current row and return true.
- currentRow = makeRow(entry.getKey(), entry.getValue());
- return true;
- }
-
- @Override
- public Row getCurrentRow() {
- if (currentRow == null) {
- throw new NoSuchElementException();
- }
- return currentRow;
- }
-
- @Override
- public void close() {
- rows = null;
- currentRow = null;
- }
- }
-
- /**
- * A {@link BigtableService.Writer} implementation that writes to the static instance of
- * {@link FakeBigtableService} stored in {@link #service}.
- *
- * <p>This writer only supports {@link Mutation Mutations} that consist only of {@link SetCell}
- * entries. The column family in the {@link SetCell} is ignored; only the value is used.
- *
- * <p>When no {@link SetCell} is provided, the write will fail and this will be exposed via an
- * exception on the returned {@link ListenableFuture}.
- */
- private static class FakeBigtableWriter implements BigtableService.Writer {
- private final String tableId;
-
- public FakeBigtableWriter(String tableId) {
- this.tableId = tableId;
- }
-
- @Override
- public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record) {
- service.verifyTableExists(tableId);
- Map<ByteString, ByteString> table = service.getTable(tableId);
- ByteString key = record.getKey();
- for (Mutation m : record.getValue()) {
- SetCell cell = m.getSetCell();
- if (cell.getValue().isEmpty()) {
- return Futures.immediateFailedCheckedFuture(new IOException("cell value missing"));
- }
- table.put(key, cell.getValue());
- }
- return Futures.immediateFuture(Empty.getDefaultInstance());
- }
-
- @Override
- public void close() {}
- }
-
- /** A serializable comparator for ByteString. Used to make row samples. */
- private static final class ByteStringComparator implements Comparator<ByteString>, Serializable {
- @Override
- public int compare(ByteString o1, ByteString o2) {
- return ByteKey.of(o1).compareTo(ByteKey.of(o2));
- }
- }
-}
[3/3] incubator-beam git commit: Closes #97
Posted by dh...@apache.org.
Closes #97
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aa43ec0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aa43ec0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aa43ec0b
Branch: refs/heads/master
Commit: aa43ec0b041b21a49601efcb3699456b52064f69
Parents: 9746f0d d138ae5
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 26 14:08:55 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 26 14:08:55 2016 -0700
----------------------------------------------------------------------
pom.xml | 1 -
sdks/java/core/pom.xml | 25 -
.../apache/beam/sdk/io/bigtable/BigtableIO.java | 1016 ------------------
.../beam/sdk/io/bigtable/BigtableService.java | 111 --
.../sdk/io/bigtable/BigtableServiceImpl.java | 244 -----
.../beam/sdk/io/bigtable/package-info.java | 23 -
.../beam/sdk/io/bigtable/BigtableIOTest.java | 729 -------------
sdks/java/io/google-cloud-platform/pom.xml | 96 ++
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 1016 ++++++++++++++++++
.../sdk/io/gcp/bigtable/BigtableService.java | 111 ++
.../io/gcp/bigtable/BigtableServiceImpl.java | 244 +++++
.../beam/sdk/io/gcp/bigtable/package-info.java | 23 +
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 729 +++++++++++++
sdks/java/io/pom.xml | 1 +
14 files changed, 2220 insertions(+), 2149 deletions(-)
----------------------------------------------------------------------