You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/26 23:23:11 UTC

[1/3] incubator-beam git commit: [BEAM-77] Move bigtable in IO

Repository: incubator-beam
Updated Branches:
  refs/heads/master 9746f0d1d -> aa43ec0b0


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
new file mode 100644
index 0000000..d1d5cd6
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>io-parent</artifactId>
+    <version>0.1.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>google-cloud-platform</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: Google Cloud Platform</name>
+  <description>IO library to read and write Google Cloud Platform systems from Beam.</description>
+  <packaging>jar</packaging>
+
+  <properties>
+    <bigtable.version>0.2.3</bigtable.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>java-sdk-all</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-all</artifactId>
+      <version>0.12.0</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.cloud.bigtable</groupId>
+      <artifactId>bigtable-protos</artifactId>
+      <version>${bigtable.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.cloud.bigtable</groupId>
+      <artifactId>bigtable-client-core</artifactId>
+      <version>${bigtable.version}</version>
+    </dependency>
+
+    <!-- test -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>java-sdk-all</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>1.3</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.11</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <version>1.7.14</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
new file mode 100644
index 0000000..f99eb1d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -0,0 +1,1016 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.Sink.WriteOperation;
+import org.apache.beam.sdk.io.Sink.Writer;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.ReleaseInfo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+import com.google.bigtable.v1.Mutation;
+import com.google.bigtable.v1.Row;
+import com.google.bigtable.v1.RowFilter;
+import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.annotation.Nullable;
+
+/**
+ * A bounded source and sink for Google Cloud Bigtable.
+ *
+ * <p>For more information, see the online documentation at
+ * <a href="https://cloud.google.com/bigtable/">Google Cloud Bigtable</a>.
+ *
+ * <h3>Reading from Cloud Bigtable</h3>
+ *
+ * <p>The Bigtable source returns a set of rows from a single table, returning a
+ * {@code PCollection<Row>}.
+ *
+ * <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions}
+ * or builder configured with the project and other information necessary to identify the
+ * Bigtable cluster. A {@link RowFilter} may also optionally be specified using
+ * {@link BigtableIO.Read#withRowFilter}. For example:
+ *
+ * <pre>{@code
+ * BigtableOptions.Builder optionsBuilder =
+ *     new BigtableOptions.Builder()
+ *         .setProjectId("project")
+ *         .setClusterId("cluster")
+ *         .setZoneId("zone");
+ *
+ * Pipeline p = ...;
+ *
+ * // Scan the entire table.
+ * p.apply("read",
+ *     BigtableIO.read()
+ *         .withBigtableOptions(optionsBuilder)
+ *         .withTableId("table"));
+ *
+ * // Scan a subset of rows that match the specified row filter.
+ * p.apply("filtered read",
+ *     BigtableIO.read()
+ *         .withBigtableOptions(optionsBuilder)
+ *         .withTableId("table")
+ *         .withRowFilter(filter));
+ * }</pre>
+ *
+ * <h3>Writing to Cloud Bigtable</h3>
+ *
+ * <p>The Bigtable sink executes a set of row mutations on a single table. It takes as input a
+ * {@link PCollection PCollection&lt;KV&lt;ByteString, Iterable&lt;Mutation&gt;&gt;&gt;}, where the
+ * {@link ByteString} is the key of the row being mutated, and each {@link Mutation} represents an
+ * idempotent transformation to that row.
+ *
+ * <p>To configure a Cloud Bigtable sink, you must supply a table id and a {@link BigtableOptions}
+ * or builder configured with the project and other information necessary to identify the
+ * Bigtable cluster, for example:
+ *
+ * <pre>{@code
+ * BigtableOptions.Builder optionsBuilder =
+ *     new BigtableOptions.Builder()
+ *         .setProjectId("project")
+ *         .setClusterId("cluster")
+ *         .setZoneId("zone");
+ *
+ * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
+ *
+ * data.apply("write",
+ *     BigtableIO.write()
+ *         .withBigtableOptions(optionsBuilder)
+ *         .withTableId("table"));
+ * }</pre>
+ *
+ * <h3>Experimental</h3>
+ *
+ * <p>This connector for Cloud Bigtable is considered experimental and may break or receive
+ * backwards-incompatible changes in future versions of the Cloud Dataflow SDK. Cloud Bigtable is
+ * in Beta, and thus it may introduce breaking changes in future revisions of its service or APIs.
+ *
+ * <h3>Permissions</h3>
+ *
+ * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
+ * Dataflow job. Please refer to the documentation of corresponding
+ * {@link PipelineRunner PipelineRunners} for more details.
+ */
+@Experimental
+public class BigtableIO {
+  private static final Logger logger = LoggerFactory.getLogger(BigtableIO.class);
+
+  /**
+   * Creates an uninitialized {@link BigtableIO.Read}. Before use, the {@code Read} must be
+   * initialized with a
+   * {@link BigtableIO.Read#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
+   * the source Cloud Bigtable cluster, and a {@link BigtableIO.Read#withTableId tableId} that
+   * specifies which table to read. A {@link RowFilter} may also optionally be specified using
+   * {@link BigtableIO.Read#withRowFilter}.
+   */
+  @Experimental
+  public static Read read() {
+    return new Read(null, "", null, null);
+  }
+
+  /**
+   * Creates an uninitialized {@link BigtableIO.Write}. Before use, the {@code Write} must be
+   * initialized with a
+   * {@link BigtableIO.Write#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
+   * the destination Cloud Bigtable cluster, and a {@link BigtableIO.Write#withTableId tableId} that
+   * specifies which table to write.
+   */
+  @Experimental
+  public static Write write() {
+    return new Write(null, "", null);
+  }
+
+  /**
+   * A {@link PTransform} that reads from Google Cloud Bigtable. See the class-level Javadoc on
+   * {@link BigtableIO} for more information.
+   *
+   * @see BigtableIO
+   */
+  @Experimental
+  public static class Read extends PTransform<PBegin, PCollection<Row>> {
+    /**
+     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
+     * indicated by the given options, and using any other specified customizations.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withBigtableOptions(BigtableOptions options) {
+      checkNotNull(options, "options");
+      return withBigtableOptions(options.toBuilder());
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
+     * indicated by the given options, and using any other specified customizations.
+     *
+     * <p>Clones the given {@link BigtableOptions} builder so that any further changes
+     * will have no effect on the returned {@link BigtableIO.Read}.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
+      checkNotNull(optionsBuilder, "optionsBuilder");
+      // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
+      BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
+      BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
+      return new Read(optionsWithAgent, tableId, filter, bigtableService);
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
+     * using the given row filter.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withRowFilter(RowFilter filter) {
+      checkNotNull(filter, "filter");
+      return new Read(options, tableId, filter, bigtableService);
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Read} that will read from the specified table.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withTableId(String tableId) {
+      checkNotNull(tableId, "tableId");
+      return new Read(options, tableId, filter, bigtableService);
+    }
+
+    /**
+     * Returns the Google Cloud Bigtable cluster being read from, and other parameters.
+     */
+    public BigtableOptions getBigtableOptions() {
+      return options;
+    }
+
+    /**
+     * Returns the table being read from.
+     */
+    public String getTableId() {
+      return tableId;
+    }
+
+    @Override
+    public PCollection<Row> apply(PBegin input) {
+      BigtableSource source =
+          new BigtableSource(getBigtableService(), tableId, filter, ByteKeyRange.ALL_KEYS, null);
+      return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
+    }
+
+    @Override
+    public void validate(PBegin input) {
+      checkArgument(options != null, "BigtableOptions not specified");
+      checkArgument(!tableId.isEmpty(), "Table ID not specified");
+      try {
+        checkArgument(
+            getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
+      } catch (IOException e) {
+        logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+
+      builder.add("tableId", tableId);
+
+      if (options != null) {
+        builder.add("bigtableOptions", options.toString());
+      }
+
+      if (filter != null) {
+        builder.add("rowFilter", filter.toString());
+      }
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(Read.class)
+          .add("options", options)
+          .add("tableId", tableId)
+          .add("filter", filter)
+          .toString();
+    }
+
+    /////////////////////////////////////////////////////////////////////////////////////////
+    /**
+     * Used to define the Cloud Bigtable cluster and any options for the networking layer.
+     * Cannot actually be {@code null} at validation time, but may start out {@code null} while
+     * source is being built.
+     */
+    @Nullable private final BigtableOptions options;
+    private final String tableId;
+    @Nullable private final RowFilter filter;
+    @Nullable private final BigtableService bigtableService;
+
+    private Read(
+        @Nullable BigtableOptions options,
+        String tableId,
+        @Nullable RowFilter filter,
+        @Nullable BigtableService bigtableService) {
+      this.options = options;
+      this.tableId = checkNotNull(tableId, "tableId");
+      this.filter = filter;
+      this.bigtableService = bigtableService;
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Read} that will read using the given Cloud Bigtable
+     * service implementation.
+     *
+     * <p>This is used for testing.
+     *
+     * <p>Does not modify this object.
+     */
+    Read withBigtableService(BigtableService bigtableService) {
+      checkNotNull(bigtableService, "bigtableService");
+      return new Read(options, tableId, filter, bigtableService);
+    }
+
+    /**
+     * Helper function that either returns the mock Bigtable service supplied by
+     * {@link #withBigtableService} or creates and returns an implementation that talks to
+     * {@code Cloud Bigtable}.
+     */
+    private BigtableService getBigtableService() {
+      if (bigtableService != null) {
+        return bigtableService;
+      }
+      return new BigtableServiceImpl(options);
+    }
+  }
+
+  /**
+   * A {@link PTransform} that writes to Google Cloud Bigtable. See the class-level Javadoc on
+   * {@link BigtableIO} for more information.
+   *
+   * @see BigtableIO
+   */
+  @Experimental
+  public static class Write
+      extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {
+    /**
+     * Used to define the Cloud Bigtable cluster and any options for the networking layer.
+     * Cannot actually be {@code null} at validation time, but may start out {@code null} while
+     * source is being built.
+     */
+    @Nullable private final BigtableOptions options;
+    private final String tableId;
+    @Nullable private final BigtableService bigtableService;
+
+    private Write(
+        @Nullable BigtableOptions options,
+        String tableId,
+        @Nullable BigtableService bigtableService) {
+      this.options = options;
+      this.tableId = checkNotNull(tableId, "tableId");
+      this.bigtableService = bigtableService;
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
+     * indicated by the given options, and using any other specified customizations.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withBigtableOptions(BigtableOptions options) {
+      checkNotNull(options, "options");
+      return withBigtableOptions(options.toBuilder());
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
+     * indicated by the given options, and using any other specified customizations.
+     *
+     * <p>Clones the given {@link BigtableOptions} builder so that any further changes
+     * will have no effect on the returned {@link BigtableIO.Write}.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
+      checkNotNull(optionsBuilder, "optionsBuilder");
+      // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
+      BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
+      BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
+      return new Write(optionsWithAgent, tableId, bigtableService);
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Write} that will write to the specified table.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withTableId(String tableId) {
+      checkNotNull(tableId, "tableId");
+      return new Write(options, tableId, bigtableService);
+    }
+
+    /**
+     * Returns the Google Cloud Bigtable cluster being written to, and other parameters.
+     */
+    public BigtableOptions getBigtableOptions() {
+      return options;
+    }
+
+    /**
+     * Returns the table being written to.
+     */
+    public String getTableId() {
+      return tableId;
+    }
+
+    @Override
+    public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
+      Sink sink = new Sink(tableId, getBigtableService());
+      return input.apply(org.apache.beam.sdk.io.Write.to(sink));
+    }
+
+    @Override
+    public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
+      checkArgument(options != null, "BigtableOptions not specified");
+      checkArgument(!tableId.isEmpty(), "Table ID not specified");
+      try {
+        checkArgument(
+            getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
+      } catch (IOException e) {
+        logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+      }
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Write} that will write using the given Cloud Bigtable
+     * service implementation.
+     *
+     * <p>This is used for testing.
+     *
+     * <p>Does not modify this object.
+     */
+    Write withBigtableService(BigtableService bigtableService) {
+      checkNotNull(bigtableService, "bigtableService");
+      return new Write(options, tableId, bigtableService);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+
+      builder.add("tableId", tableId);
+
+      if (options != null) {
+        builder.add("bigtableOptions", options.toString());
+      }
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(Write.class)
+          .add("options", options)
+          .add("tableId", tableId)
+          .toString();
+    }
+
+    /**
+     * Helper function that either returns the mock Bigtable service supplied by
+     * {@link #withBigtableService} or creates and returns an implementation that talks to
+     * {@code Cloud Bigtable}.
+     */
+    private BigtableService getBigtableService() {
+      if (bigtableService != null) {
+        return bigtableService;
+      }
+      return new BigtableServiceImpl(options);
+    }
+  }
+
+  //////////////////////////////////////////////////////////////////////////////////////////
+  /** Disallow construction of utility class. */
+  private BigtableIO() {}
+
+  static class BigtableSource extends BoundedSource<Row> {
+    public BigtableSource(
+        BigtableService service,
+        String tableId,
+        @Nullable RowFilter filter,
+        ByteKeyRange range,
+        Long estimatedSizeBytes) {
+      this.service = service;
+      this.tableId = tableId;
+      this.filter = filter;
+      this.range = range;
+      this.estimatedSizeBytes = estimatedSizeBytes;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(BigtableSource.class)
+          .add("tableId", tableId)
+          .add("filter", filter)
+          .add("range", range)
+          .add("estimatedSizeBytes", estimatedSizeBytes)
+          .toString();
+    }
+
+    ////// Private state and internal implementation details //////
+    private final BigtableService service;
+    @Nullable private final String tableId;
+    @Nullable private final RowFilter filter;
+    private final ByteKeyRange range;
+    @Nullable private Long estimatedSizeBytes;
+    @Nullable private transient List<SampleRowKeysResponse> sampleRowKeys;
+
+    protected BigtableSource withStartKey(ByteKey startKey) {
+      checkNotNull(startKey, "startKey");
+      return new BigtableSource(
+          service, tableId, filter, range.withStartKey(startKey), estimatedSizeBytes);
+    }
+
+    protected BigtableSource withEndKey(ByteKey endKey) {
+      checkNotNull(endKey, "endKey");
+      return new BigtableSource(
+          service, tableId, filter, range.withEndKey(endKey), estimatedSizeBytes);
+    }
+
+    protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {
+      checkNotNull(estimatedSizeBytes, "estimatedSizeBytes");
+      return new BigtableSource(service, tableId, filter, range, estimatedSizeBytes);
+    }
+
+    /**
+     * Makes an API call to the Cloud Bigtable service that gives information about tablet key
+     * boundaries and estimated sizes. We can use these samples to ensure that splits are on
+     * different tablets, and possibly generate sub-splits within tablets.
+     */
+    private List<SampleRowKeysResponse> getSampleRowKeys() throws IOException {
+      return service.getSampleRowKeys(this);
+    }
+
+    @Override
+    public List<BigtableSource> splitIntoBundles(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      // Update the desiredBundleSizeBytes in order to limit the
+      // number of splits to maximumNumberOfSplits.
+      long maximumNumberOfSplits = 4000;
+      long sizeEstimate = getEstimatedSizeBytes(options);
+      desiredBundleSizeBytes =
+          Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes);
+
+      // Delegate to testable helper.
+      return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys());
+    }
+
+    /** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */
+    private List<BigtableSource> splitIntoBundlesBasedOnSamples(
+        long desiredBundleSizeBytes, List<SampleRowKeysResponse> sampleRowKeys) {
+      // There are no regions, or no samples available. Just scan the entire range.
+      if (sampleRowKeys.isEmpty()) {
+        logger.info("Not splitting source {} because no sample row keys are available.", this);
+        return Collections.singletonList(this);
+      }
+
+      logger.info(
+          "About to split into bundles of size {} with sampleRowKeys length {} first element {}",
+          desiredBundleSizeBytes,
+          sampleRowKeys.size(),
+          sampleRowKeys.get(0));
+
+      // Loop through all sampled responses and generate splits from the ones that overlap the
+      // scan range. The main complication is that we must track the end range of the previous
+      // sample to generate good ranges.
+      ByteKey lastEndKey = ByteKey.EMPTY;
+      long lastOffset = 0;
+      ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
+      for (SampleRowKeysResponse response : sampleRowKeys) {
+        ByteKey responseEndKey = ByteKey.of(response.getRowKey());
+        long responseOffset = response.getOffsetBytes();
+        checkState(
+            responseOffset >= lastOffset,
+            "Expected response byte offset %s to come after the last offset %s",
+            responseOffset,
+            lastOffset);
+
+        if (!range.overlaps(ByteKeyRange.of(lastEndKey, responseEndKey))) {
+          // This region does not overlap the scan, so skip it.
+          lastOffset = responseOffset;
+          lastEndKey = responseEndKey;
+          continue;
+        }
+
+        // Calculate the beginning of the split as the larger of startKey and the end of the last
+        // split. Unspecified start is smallest key so is correctly treated as earliest key.
+        ByteKey splitStartKey = lastEndKey;
+        if (splitStartKey.compareTo(range.getStartKey()) < 0) {
+          splitStartKey = range.getStartKey();
+        }
+
+        // Calculate the end of the split as the smaller of endKey and the end of this sample. Note
+        // that range.containsKey handles the case when range.getEndKey() is empty.
+        ByteKey splitEndKey = responseEndKey;
+        if (!range.containsKey(splitEndKey)) {
+          splitEndKey = range.getEndKey();
+        }
+
+        // We know this region overlaps the desired key range, and we know a rough estimate of its
+        // size. Split the key range into bundle-sized chunks and then add them all as splits.
+        long sampleSizeBytes = responseOffset - lastOffset;
+        List<BigtableSource> subSplits =
+            splitKeyRangeIntoBundleSizedSubranges(
+                sampleSizeBytes,
+                desiredBundleSizeBytes,
+                ByteKeyRange.of(splitStartKey, splitEndKey));
+        splits.addAll(subSplits);
+
+        // Move to the next region.
+        lastEndKey = responseEndKey;
+        lastOffset = responseOffset;
+      }
+
+      // We must add one more region after the end of the samples if both these conditions hold:
+      //  1. we did not scan to the end yet (lastEndKey is concrete, not 0-length).
+      //  2. we want to scan to the end (endKey is empty) or farther (lastEndKey < endKey).
+      if (!lastEndKey.isEmpty()
+          && (range.getEndKey().isEmpty() || lastEndKey.compareTo(range.getEndKey()) < 0)) {
+        splits.add(this.withStartKey(lastEndKey).withEndKey(range.getEndKey()));
+      }
+
+      List<BigtableSource> ret = splits.build();
+      logger.info("Generated {} splits. First split: {}", ret.size(), ret.get(0));
+      return ret;
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
+      // Delegate to testable helper.
+      if (estimatedSizeBytes == null) {
+        estimatedSizeBytes = getEstimatedSizeBytesBasedOnSamples(getSampleRowKeys());
+      }
+      return estimatedSizeBytes;
+    }
+
+    /**
+     * Computes the estimated size in bytes based on the total size of all samples that overlap
+     * the key range this source will scan.
+     */
+    private long getEstimatedSizeBytesBasedOnSamples(List<SampleRowKeysResponse> samples) {
+      long estimatedSizeBytes = 0;
+      long lastOffset = 0;
+      ByteKey currentStartKey = ByteKey.EMPTY;
+      // Compute the total estimated size as the size of each sample that overlaps the scan range.
+      // TODO: In future, Bigtable service may provide finer grained APIs, e.g., to sample given a
+      // filter or to sample on a given key range.
+      for (SampleRowKeysResponse response : samples) {
+        ByteKey currentEndKey = ByteKey.of(response.getRowKey());
+        long currentOffset = response.getOffsetBytes();
+        if (!currentStartKey.isEmpty() && currentStartKey.equals(currentEndKey)) {
+          // Skip an empty region.
+          lastOffset = currentOffset;
+          continue;
+        } else if (range.overlaps(ByteKeyRange.of(currentStartKey, currentEndKey))) {
+          estimatedSizeBytes += currentOffset - lastOffset;
+        }
+        currentStartKey = currentEndKey;
+        lastOffset = currentOffset;
+      }
+      return estimatedSizeBytes;
+    }
+
+    /**
+     * Cloud Bigtable returns query results ordered by key.
+     */
+    @Override
+    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+      return true;
+    }
+
+    @Override
+    public BoundedReader<Row> createReader(PipelineOptions options) throws IOException {
+      return new BigtableReader(this, service);
+    }
+
+    @Override
+    public void validate() {
+      checkArgument(!tableId.isEmpty(), "tableId cannot be empty");
+    }
+
+    @Override
+    public Coder<Row> getDefaultOutputCoder() {
+      return ProtoCoder.of(Row.class);
+    }
+
+    /** Helper that splits the specified range in this source into bundles. */
+    private List<BigtableSource> splitKeyRangeIntoBundleSizedSubranges(
+        long sampleSizeBytes, long desiredBundleSizeBytes, ByteKeyRange range) {
+      // Catch the trivial cases. Split is small enough already, or this is the last region.
+      logger.debug(
+          "Subsplit for sampleSizeBytes {} and desiredBundleSizeBytes {}",
+          sampleSizeBytes,
+          desiredBundleSizeBytes);
+      if (sampleSizeBytes <= desiredBundleSizeBytes) {
+        return Collections.singletonList(
+            this.withStartKey(range.getStartKey()).withEndKey(range.getEndKey()));
+      }
+
+      checkArgument(
+          sampleSizeBytes > 0, "Sample size %s bytes must be greater than 0.", sampleSizeBytes);
+      checkArgument(
+          desiredBundleSizeBytes > 0,
+          "Desired bundle size %s bytes must be greater than 0.",
+          desiredBundleSizeBytes);
+
+      int splitCount = (int) Math.ceil(((double) sampleSizeBytes) / (desiredBundleSizeBytes));
+      List<ByteKey> splitKeys = range.split(splitCount);
+      ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
+      Iterator<ByteKey> keys = splitKeys.iterator();
+      ByteKey prev = keys.next();
+      while (keys.hasNext()) {
+        ByteKey next = keys.next();
+        splits.add(
+            this
+                .withStartKey(prev)
+                .withEndKey(next)
+                .withEstimatedSizeBytes(sampleSizeBytes / splitCount));
+        prev = next;
+      }
+      return splits.build();
+    }
+
+    public ByteKeyRange getRange() {
+      return range;
+    }
+
+    public RowFilter getRowFilter() {
+      return filter;
+    }
+
+    public String getTableId() {
+      return tableId;
+    }
+  }
+
+  private static class BigtableReader extends BoundedReader<Row> {
+    // Thread-safety: source is protected via synchronization and is only accessed or modified
+    // inside a synchronized block (or constructor, which is the same).
+    private BigtableSource source;
+    private BigtableService service;
+    private BigtableService.Reader reader;
+    private final ByteKeyRangeTracker rangeTracker;
+    private long recordsReturned;
+
+    public BigtableReader(BigtableSource source, BigtableService service) {
+      this.source = source;
+      this.service = service;
+      rangeTracker = ByteKeyRangeTracker.of(source.getRange());
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      reader = service.createReader(getCurrentSource());
+      boolean hasRecord =
+          reader.start()
+              && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
+      if (hasRecord) {
+        ++recordsReturned;
+      }
+      return hasRecord;
+    }
+
+    @Override
+    public synchronized BigtableSource getCurrentSource() {
+      return source;
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      boolean hasRecord =
+          reader.advance()
+              && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
+      if (hasRecord) {
+        ++recordsReturned;
+      }
+      return hasRecord;
+    }
+
+    @Override
+    public Row getCurrent() throws NoSuchElementException {
+      return reader.getCurrentRow();
+    }
+
+    @Override
+    public void close() throws IOException {
+      logger.info("Closing reader after reading {} records.", recordsReturned);
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    }
+
+    @Override
+    public final Double getFractionConsumed() {
+      return rangeTracker.getFractionConsumed();
+    }
+
+    @Override
+    public final synchronized BigtableSource splitAtFraction(double fraction) {
+      ByteKey splitKey;
+      try {
+        splitKey = source.getRange().interpolateKey(fraction);
+      } catch (IllegalArgumentException e) {
+        logger.info("%s: Failed to interpolate key for fraction %s.", source.getRange(), fraction);
+        return null;
+      }
+      logger.debug(
+          "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey);
+      if (!rangeTracker.trySplitAtPosition(splitKey)) {
+        return null;
+      }
+      BigtableSource primary = source.withEndKey(splitKey);
+      BigtableSource residual = source.withStartKey(splitKey);
+      this.source = primary;
+      return residual;
+    }
+  }
+
+  private static class Sink
+      extends org.apache.beam.sdk.io.Sink<KV<ByteString, Iterable<Mutation>>> {
+
+    public Sink(String tableId, BigtableService bigtableService) {
+      this.tableId = checkNotNull(tableId, "tableId");
+      this.bigtableService = checkNotNull(bigtableService, "bigtableService");
+    }
+
+    public String getTableId() {
+      return tableId;
+    }
+
+    public BigtableService getBigtableService() {
+      return bigtableService;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(Sink.class)
+          .add("bigtableService", bigtableService)
+          .add("tableId", tableId)
+          .toString();
+    }
+
+    ///////////////////////////////////////////////////////////////////////////////
+    private final String tableId;
+    private final BigtableService bigtableService;
+
+    @Override
+    public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> createWriteOperation(
+        PipelineOptions options) {
+      return new BigtableWriteOperation(this);
+    }
+
+    /** Does nothing, as it is redundant with {@link Write#validate}. */
+    @Override
+    public void validate(PipelineOptions options) {}
+  }
+
+  private static class BigtableWriteOperation
+      extends WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> {
+    private final Sink sink;
+
+    public BigtableWriteOperation(Sink sink) {
+      this.sink = sink;
+    }
+
+    @Override
+    public Writer<KV<ByteString, Iterable<Mutation>>, Long> createWriter(PipelineOptions options)
+        throws Exception {
+      return new BigtableWriter(this);
+    }
+
+    @Override
+    public void initialize(PipelineOptions options) {}
+
+    @Override
+    public void finalize(Iterable<Long> writerResults, PipelineOptions options) {
+      long count = 0;
+      for (Long value : writerResults) {
+        value += count;
+      }
+      logger.debug("Wrote {} elements to BigtableIO.Sink {}", sink);
+    }
+
+    @Override
+    public Sink getSink() {
+      return sink;
+    }
+
+    @Override
+    public Coder<Long> getWriterResultCoder() {
+      return VarLongCoder.of();
+    }
+  }
+
+  private static class BigtableWriter extends Writer<KV<ByteString, Iterable<Mutation>>, Long> {
+    private final BigtableWriteOperation writeOperation;
+    private final Sink sink;
+    private BigtableService.Writer bigtableWriter;
+    private long recordsWritten;
+    private final ConcurrentLinkedQueue<BigtableWriteException> failures;
+
+    public BigtableWriter(BigtableWriteOperation writeOperation) {
+      this.writeOperation = writeOperation;
+      this.sink = writeOperation.getSink();
+      this.failures = new ConcurrentLinkedQueue<>();
+    }
+
+    @Override
+    public void open(String uId) throws Exception {
+      bigtableWriter = sink.getBigtableService().openForWriting(sink.getTableId());
+      recordsWritten = 0;
+    }
+
+    /**
+     * If any write has asynchronously failed, fail the bundle with a useful error.
+     */
+    private void checkForFailures() throws IOException {
+      // Note that this function is never called by multiple threads and is the only place that
+      // we remove from failures, so this code is safe.
+      if (failures.isEmpty()) {
+        return;
+      }
+
+      StringBuilder logEntry = new StringBuilder();
+      int i = 0;
+      for (; i < 10 && !failures.isEmpty(); ++i) {
+        BigtableWriteException exc = failures.remove();
+        logEntry.append("\n").append(exc.getMessage());
+        if (exc.getCause() != null) {
+          logEntry.append(": ").append(exc.getCause().getMessage());
+        }
+      }
+      String message =
+          String.format(
+              "At least %d errors occurred writing to Bigtable. First %d errors: %s",
+              i + failures.size(),
+              i,
+              logEntry.toString());
+      logger.error(message);
+      throw new IOException(message);
+    }
+
+    @Override
+    public void write(KV<ByteString, Iterable<Mutation>> rowMutations) throws Exception {
+      checkForFailures();
+      Futures.addCallback(
+          bigtableWriter.writeRecord(rowMutations), new WriteExceptionCallback(rowMutations));
+      ++recordsWritten;
+    }
+
+    @Override
+    public Long close() throws Exception {
+      bigtableWriter.close();
+      bigtableWriter = null;
+      checkForFailures();
+      logger.info("Wrote {} records", recordsWritten);
+      return recordsWritten;
+    }
+
+    @Override
+    public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> getWriteOperation() {
+      return writeOperation;
+    }
+
+    private class WriteExceptionCallback implements FutureCallback<Empty> {
+      private final KV<ByteString, Iterable<Mutation>> value;
+
+      public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) {
+        this.value = value;
+      }
+
+      @Override
+      public void onFailure(Throwable cause) {
+        failures.add(new BigtableWriteException(value, cause));
+      }
+
+      @Override
+      public void onSuccess(Empty produced) {}
+    }
+  }
+
+  /**
+   * An exception that puts information about the failed record being written in its message.
+   */
+  static class BigtableWriteException extends IOException {
+    public BigtableWriteException(KV<ByteString, Iterable<Mutation>> record, Throwable cause) {
+      super(
+          String.format(
+              "Error mutating row %s with mutations %s",
+              record.getKey().toStringUtf8(),
+              record.getValue()),
+          cause);
+    }
+  }
+
+  /**
+   * A helper function to produce a Cloud Bigtable user agent string.
+   */
+  private static String getUserAgent() {
+    String javaVersion = System.getProperty("java.specification.version");
+    ReleaseInfo info = ReleaseInfo.getReleaseInfo();
+    return String.format(
+        "%s/%s (%s); %s",
+        info.getName(),
+        info.getVersion(),
+        javaVersion,
+        "0.2.3" /* TODO get Bigtable client version directly from jar. */);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
new file mode 100644
index 0000000..93e558b
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.bigtable.v1.Mutation;
+import com.google.bigtable.v1.Row;
+import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * An interface for real or fake implementations of Cloud Bigtable.
+ */
+interface BigtableService extends Serializable {
+
+  /**
+   * The interface of a class that can write to Cloud Bigtable.
+   */
+  interface Writer {
+    /**
+     * Writes a single row transaction to Cloud Bigtable. The key of the {@code record} is the
+     * row key to be mutated and the iterable of mutations represent the changes to be made to the
+     * row.
+     *
+     * @throws IOException if there is an error submitting the write.
+     */
+    ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
+        throws IOException;
+
+    /**
+     * Closes the writer.
+     *
+     * @throws IOException if any writes did not succeed
+     */
+    void close() throws IOException;
+  }
+
+  /**
+   * The interface of a class that reads from Cloud Bigtable.
+   */
+  interface Reader {
+    /**
+     * Reads the first element (including initialization, such as opening a network connection) and
+     * returns true if an element was found.
+     */
+    boolean start() throws IOException;
+
+    /**
+     * Attempts to read the next element, and returns true if an element has been read.
+     */
+    boolean advance() throws IOException;
+
+    /**
+     * Closes the reader.
+     *
+     * @throws IOException if there is an error.
+     */
+    void close() throws IOException;
+
+    /**
+     * Returns the last row read by a successful start() or advance(), or throws if there is no
+     * current row because the last such call was unsuccessful.
+     */
+    Row getCurrentRow() throws NoSuchElementException;
+  }
+
+  /**
+   * Returns {@code true} if the table with the give name exists.
+   */
+  boolean tableExists(String tableId) throws IOException;
+
+  /**
+   * Returns a {@link Reader} that will read from the specified source.
+   */
+  Reader createReader(BigtableSource source) throws IOException;
+
+  /**
+   * Returns a {@link Writer} that will write to the specified table.
+   */
+  Writer openForWriting(String tableId) throws IOException;
+
+  /**
+   * Returns a set of row keys sampled from the underlying table. These contain information about
+   * the distribution of keys within the table.
+   */
+  List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
new file mode 100644
index 0000000..5933e13
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.bigtable.admin.table.v1.GetTableRequest;
+import com.google.bigtable.v1.MutateRowRequest;
+import com.google.bigtable.v1.Mutation;
+import com.google.bigtable.v1.ReadRowsRequest;
+import com.google.bigtable.v1.Row;
+import com.google.bigtable.v1.RowRange;
+import com.google.bigtable.v1.SampleRowKeysRequest;
+import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.grpc.BigtableSession;
+import com.google.cloud.bigtable.grpc.async.AsyncExecutor;
+import com.google.cloud.bigtable.grpc.async.HeapSizeManager;
+import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
+import com.google.common.base.MoreObjects;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+
+import io.grpc.Status.Code;
+import io.grpc.StatusRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * An implementation of {@link BigtableService} that actually communicates with the Cloud Bigtable
+ * service.
+ */
+class BigtableServiceImpl implements BigtableService {
+  private static final Logger logger = LoggerFactory.getLogger(BigtableService.class);
+
+  public BigtableServiceImpl(BigtableOptions options) {
+    this.options = options;
+  }
+
+  private final BigtableOptions options;
+
+  @Override
+  public BigtableWriterImpl openForWriting(String tableId) throws IOException {
+    BigtableSession session = new BigtableSession(options);
+    String tableName = options.getClusterName().toTableNameStr(tableId);
+    return new BigtableWriterImpl(session, tableName);
+  }
+
+  @Override
+  public boolean tableExists(String tableId) throws IOException {
+    if (!BigtableSession.isAlpnProviderEnabled()) {
+      logger.info(
+          "Skipping existence check for table {} (BigtableOptions {}) because ALPN is not"
+              + " configured.",
+          tableId,
+          options);
+      return true;
+    }
+
+    try (BigtableSession session = new BigtableSession(options)) {
+      GetTableRequest getTable =
+          GetTableRequest.newBuilder()
+              .setName(options.getClusterName().toTableNameStr(tableId))
+              .build();
+      session.getTableAdminClient().getTable(getTable);
+      return true;
+    } catch (StatusRuntimeException e) {
+      if (e.getStatus().getCode() == Code.NOT_FOUND) {
+        return false;
+      }
+      String message =
+          String.format(
+              "Error checking whether table %s (BigtableOptions %s) exists", tableId, options);
+      logger.error(message, e);
+      throw new IOException(message, e);
+    }
+  }
+
+  private class BigtableReaderImpl implements Reader {
+    private BigtableSession session;
+    private final BigtableSource source;
+    private ResultScanner<Row> results;
+    private Row currentRow;
+
+    public BigtableReaderImpl(BigtableSession session, BigtableSource source) {
+      this.session = session;
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      RowRange range =
+          RowRange.newBuilder()
+              .setStartKey(source.getRange().getStartKey().getValue())
+              .setEndKey(source.getRange().getEndKey().getValue())
+              .build();
+      ReadRowsRequest.Builder requestB =
+          ReadRowsRequest.newBuilder()
+              .setRowRange(range)
+              .setTableName(options.getClusterName().toTableNameStr(source.getTableId()));
+      if (source.getRowFilter() != null) {
+        requestB.setFilter(source.getRowFilter());
+      }
+      results = session.getDataClient().readRows(requestB.build());
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      currentRow = results.next();
+      return (currentRow != null);
+    }
+
+    @Override
+    public void close() throws IOException {
+      // Goal: by the end of this function, both results and session are null and closed,
+      // independent of what errors they throw or prior state.
+
+      if (session == null) {
+        // Only possible when previously closed, so we know that results is also null.
+        return;
+      }
+
+      // Session does not implement Closeable -- it's AutoCloseable. So we can't register it with
+      // the Closer, but we can use the Closer to simplify the error handling.
+      try (Closer closer = Closer.create()) {
+        if (results != null) {
+          closer.register(results);
+          results = null;
+        }
+
+        session.close();
+      } finally {
+        session = null;
+      }
+    }
+
+    @Override
+    public Row getCurrentRow() throws NoSuchElementException {
+      if (currentRow == null) {
+        throw new NoSuchElementException();
+      }
+      return currentRow;
+    }
+  }
+
+  private static class BigtableWriterImpl implements Writer {
+    private BigtableSession session;
+    private AsyncExecutor executor;
+    private final MutateRowRequest.Builder partialBuilder;
+
+    public BigtableWriterImpl(BigtableSession session, String tableName) {
+      this.session = session;
+      this.executor =
+          new AsyncExecutor(
+              session.getDataClient(),
+              new HeapSizeManager(
+                  AsyncExecutor.ASYNC_MUTATOR_MAX_MEMORY_DEFAULT,
+                  AsyncExecutor.MAX_INFLIGHT_RPCS_DEFAULT));
+
+      partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName);
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        if (executor != null) {
+          executor.flush();
+          executor = null;
+        }
+      } finally {
+        if (session != null) {
+          session.close();
+          session = null;
+        }
+      }
+    }
+
+    @Override
+    public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
+        throws IOException {
+      MutateRowRequest r =
+          partialBuilder
+              .clone()
+              .setRowKey(record.getKey())
+              .addAllMutations(record.getValue())
+              .build();
+      try {
+        return executor.mutateRowAsync(r);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException("Write interrupted", e);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects
+        .toStringHelper(BigtableServiceImpl.class)
+        .add("options", options)
+        .toString();
+  }
+
+  @Override
+  public Reader createReader(BigtableSource source) throws IOException {
+    BigtableSession session = new BigtableSession(options);
+    return new BigtableReaderImpl(session, source);
+  }
+
+  @Override
+  public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException {
+    try (BigtableSession session = new BigtableSession(options)) {
+      SampleRowKeysRequest request =
+          SampleRowKeysRequest.newBuilder()
+              .setTableName(options.getClusterName().toTableNameStr(source.getTableId()))
+              .build();
+      return session.getDataClient().sampleRowKeys(request);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/package-info.java
new file mode 100644
index 0000000..c4c8c04
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines transforms for reading and writing from Google Cloud Bigtable.
+ *
+ * @see org.apache.beam.sdk.io.gcp.bigtable.BigtableIO
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
new file mode 100644
index 0000000..403ad9d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -0,0 +1,729 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import static org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource;
+import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
+import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
+import static org.apache.beam.sdk.testing.SourceTestUtils
+    .assertSplitAtFractionSucceedsAndConsistent;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verifyNotNull;
+
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import com.google.bigtable.v1.Cell;
+import com.google.bigtable.v1.Column;
+import com.google.bigtable.v1.Family;
+import com.google.bigtable.v1.Mutation;
+import com.google.bigtable.v1.Mutation.SetCell;
+import com.google.bigtable.v1.Row;
+import com.google.bigtable.v1.RowFilter;
+import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import javax.annotation.Nullable;
+
+/**
+ * Unit tests for {@link BigtableIO}.
+ */
+@RunWith(JUnit4.class)
+public class BigtableIOTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  @Rule public ExpectedLogs logged = ExpectedLogs.none(BigtableIO.class);
+
+  /**
+   * These tests requires a static instance of the {@link FakeBigtableService} because the writers
+   * go through a serialization step when executing the test and would not affect passed-in objects
+   * otherwise.
+   */
+  private static FakeBigtableService service;
+  private static final BigtableOptions BIGTABLE_OPTIONS =
+      new BigtableOptions.Builder()
+          .setProjectId("project")
+          .setClusterId("cluster")
+          .setZoneId("zone")
+          .build();
+  private static BigtableIO.Read defaultRead =
+      BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS);
+  private static BigtableIO.Write defaultWrite =
+      BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS);
+  private Coder<KV<ByteString, Iterable<Mutation>>> bigtableCoder;
+  private static final TypeDescriptor<KV<ByteString, Iterable<Mutation>>> BIGTABLE_WRITE_TYPE =
+      new TypeDescriptor<KV<ByteString, Iterable<Mutation>>>() {};
+
+  @Before
+  public void setup() throws Exception {
+    service = new FakeBigtableService();
+    defaultRead = defaultRead.withBigtableService(service);
+    defaultWrite = defaultWrite.withBigtableService(service);
+    bigtableCoder = TestPipeline.create().getCoderRegistry().getCoder(BIGTABLE_WRITE_TYPE);
+  }
+
+  @Test
+  public void testReadBuildsCorrectly() {
+    BigtableIO.Read read =
+        BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS).withTableId("table");
+    assertEquals("project", read.getBigtableOptions().getProjectId());
+    assertEquals("cluster", read.getBigtableOptions().getClusterId());
+    assertEquals("zone", read.getBigtableOptions().getZoneId());
+    assertEquals("table", read.getTableId());
+  }
+
+  @Test
+  public void testReadBuildsCorrectlyInDifferentOrder() {
+    BigtableIO.Read read =
+        BigtableIO.read().withTableId("table").withBigtableOptions(BIGTABLE_OPTIONS);
+    assertEquals("project", read.getBigtableOptions().getProjectId());
+    assertEquals("cluster", read.getBigtableOptions().getClusterId());
+    assertEquals("zone", read.getBigtableOptions().getZoneId());
+    assertEquals("table", read.getTableId());
+  }
+
+  @Test
+  public void testWriteBuildsCorrectly() {
+    BigtableIO.Write write =
+        BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS).withTableId("table");
+    assertEquals("table", write.getTableId());
+    assertEquals("project", write.getBigtableOptions().getProjectId());
+    assertEquals("zone", write.getBigtableOptions().getZoneId());
+    assertEquals("cluster", write.getBigtableOptions().getClusterId());
+  }
+
+  @Test
+  public void testWriteBuildsCorrectlyInDifferentOrder() {
+    BigtableIO.Write write =
+        BigtableIO.write().withTableId("table").withBigtableOptions(BIGTABLE_OPTIONS);
+    assertEquals("cluster", write.getBigtableOptions().getClusterId());
+    assertEquals("project", write.getBigtableOptions().getProjectId());
+    assertEquals("zone", write.getBigtableOptions().getZoneId());
+    assertEquals("table", write.getTableId());
+  }
+
+  @Test
+  public void testWriteValidationFailsMissingTable() {
+    BigtableIO.Write write = BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS);
+
+    thrown.expect(IllegalArgumentException.class);
+
+    write.validate(null);
+  }
+
+  @Test
+  public void testWriteValidationFailsMissingOptions() {
+    BigtableIO.Write write = BigtableIO.write().withTableId("table");
+
+    thrown.expect(IllegalArgumentException.class);
+
+    write.validate(null);
+  }
+
+  /** Helper function to make a single row mutation to be written. */
+  private static KV<ByteString, Iterable<Mutation>> makeWrite(String key, String value) {
+    ByteString rowKey = ByteString.copyFromUtf8(key);
+    Iterable<Mutation> mutations =
+        ImmutableList.of(
+            Mutation.newBuilder()
+                .setSetCell(SetCell.newBuilder().setValue(ByteString.copyFromUtf8(value)))
+                .build());
+    return KV.of(rowKey, mutations);
+  }
+
+  /** Helper function to make a single bad row mutation (no set cell). */
+  private static KV<ByteString, Iterable<Mutation>> makeBadWrite(String key) {
+    Iterable<Mutation> mutations = ImmutableList.of(Mutation.newBuilder().build());
+    return KV.of(ByteString.copyFromUtf8(key), mutations);
+  }
+
+  /** Tests that when reading from a non-existent table, the read fails. */
+  @Test
+  public void testReadingFailsTableDoesNotExist() throws Exception {
+    final String table = "TEST-TABLE";
+
+    BigtableIO.Read read =
+        BigtableIO.read()
+            .withBigtableOptions(BIGTABLE_OPTIONS)
+            .withTableId(table)
+            .withBigtableService(service);
+
+    // Exception will be thrown by read.validate() when read is applied.
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(String.format("Table %s does not exist", table));
+
+    TestPipeline.create().apply(read);
+  }
+
+  /** Tests that when reading from an empty table, the read succeeds. */
+  @Test
+  public void testReadingEmptyTable() throws Exception {
+    final String table = "TEST-EMPTY-TABLE";
+    service.createTable(table);
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<Row> rows = p.apply(defaultRead.withTableId(table));
+    PAssert.that(rows).empty();
+
+    p.run();
+    logged.verifyInfo(String.format("Closing reader after reading 0 records."));
+  }
+
+  /** Tests reading all rows from a table. */
+  @Test
+  public void testReading() throws Exception {
+    final String table = "TEST-MANY-ROWS-TABLE";
+    final int numRows = 1001;
+    List<Row> testRows = makeTableData(table, numRows);
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<Row> rows = p.apply(defaultRead.withTableId(table));
+    PAssert.that(rows).containsInAnyOrder(testRows);
+
+    p.run();
+    logged.verifyInfo(String.format("Closing reader after reading %d records.", numRows));
+  }
+
+  /** A {@link Predicate} that a {@link Row Row's} key matches the given regex. */
+  private static class KeyMatchesRegex implements Predicate<ByteString> {
+    private final String regex;
+
+    public KeyMatchesRegex(String regex) {
+      this.regex = regex;
+    }
+
+    @Override
+    public boolean apply(@Nullable ByteString input) {
+      verifyNotNull(input, "input");
+      return input.toStringUtf8().matches(regex);
+    }
+  }
+
+  /** Tests reading all rows using a filter. */
+  @Test
+  public void testReadingWithFilter() throws Exception {
+    final String table = "TEST-FILTER-TABLE";
+    final int numRows = 1001;
+    List<Row> testRows = makeTableData(table, numRows);
+    String regex = ".*17.*";
+    final KeyMatchesRegex keyPredicate = new KeyMatchesRegex(regex);
+    Iterable<Row> filteredRows =
+        Iterables.filter(
+            testRows,
+            new Predicate<Row>() {
+              @Override
+              public boolean apply(@Nullable Row input) {
+                verifyNotNull(input, "input");
+                return keyPredicate.apply(input.getKey());
+              }
+            });
+
+    RowFilter filter =
+        RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(regex)).build();
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<Row> rows = p.apply(defaultRead.withTableId(table).withRowFilter(filter));
+    PAssert.that(rows).containsInAnyOrder(filteredRows);
+
+    p.run();
+  }
+
+  /**
+   * Tests dynamic work rebalancing exhaustively.
+   *
+   * <p>Because this test runs so slowly, it is disabled by default. Re-run when changing the
+   * {@link BigtableIO.Read} implementation.
+   */
+  @Ignore("Slow. Rerun when changing the implementation.")
+  @Test
+  public void testReadingSplitAtFractionExhaustive() throws Exception {
+    final String table = "TEST-FEW-ROWS-SPLIT-EXHAUSTIVE-TABLE";
+    final int numRows = 10;
+    final int numSamples = 1;
+    final long bytesPerRow = 1L;
+    makeTableData(table, numRows);
+    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+    BigtableSource source =
+        new BigtableSource(service, table, null, service.getTableRange(table), null);
+    assertSplitAtFractionExhaustive(source, null);
+  }
+
+  /**
+   * Unit tests of splitAtFraction.
+   */
+  @Test
+  public void testReadingSplitAtFraction() throws Exception {
+    final String table = "TEST-SPLIT-AT-FRACTION";
+    final int numRows = 10;
+    final int numSamples = 1;
+    final long bytesPerRow = 1L;
+    makeTableData(table, numRows);
+    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+    BigtableSource source =
+        new BigtableSource(service, table, null, service.getTableRange(table), null);
+    // With 0 items read, all split requests will fail.
+    assertSplitAtFractionFails(source, 0, 0.1, null /* options */);
+    assertSplitAtFractionFails(source, 0, 1.0, null /* options */);
+    // With 1 items read, all split requests past 1/10th will succeed.
+    assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.333, null /* options */);
+    assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.666, null /* options */);
+    // With 3 items read, all split requests past 3/10ths will succeed.
+    assertSplitAtFractionFails(source, 3, 0.2, null /* options */);
+    assertSplitAtFractionSucceedsAndConsistent(source, 3, 0.571, null /* options */);
+    assertSplitAtFractionSucceedsAndConsistent(source, 3, 0.9, null /* options */);
+    // With 6 items read, all split requests past 6/10ths will succeed.
+    assertSplitAtFractionFails(source, 6, 0.5, null /* options */);
+    assertSplitAtFractionSucceedsAndConsistent(source, 6, 0.7, null /* options */);
+  }
+
+  /** Tests reading all rows from a split table. */
+  @Test
+  public void testReadingWithSplits() throws Exception {
+    final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
+    final int numRows = 1500;
+    final int numSamples = 10;
+    final long bytesPerRow = 100L;
+
+    // Set up test table data and sample row keys for size estimation and splitting.
+    makeTableData(table, numRows);
+    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+    // Generate source and split it.
+    BigtableSource source =
+        new BigtableSource(service, table, null /*filter*/, ByteKeyRange.ALL_KEYS, null /*size*/);
+    List<BigtableSource> splits =
+        source.splitIntoBundles(numRows * bytesPerRow / numSamples, null /* options */);
+
+    // Test num splits and split equality.
+    assertThat(splits, hasSize(numSamples));
+    assertSourcesEqualReferenceSource(source, splits, null /* options */);
+  }
+
+  /** Tests reading all rows from a sub-split table. */
+  @Test
+  public void testReadingWithSubSplits() throws Exception {
+    final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
+    final int numRows = 1000;
+    final int numSamples = 10;
+    final int numSplits = 20;
+    final long bytesPerRow = 100L;
+
+    // Set up test table data and sample row keys for size estimation and splitting.
+    makeTableData(table, numRows);
+    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+    // Generate source and split it.
+    BigtableSource source =
+        new BigtableSource(service, table, null /*filter*/, ByteKeyRange.ALL_KEYS, null /*size*/);
+    List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
+
+    // Test num splits and split equality.
+    assertThat(splits, hasSize(numSplits));
+    assertSourcesEqualReferenceSource(source, splits, null /* options */);
+  }
+
+  /** Tests reading all rows from a sub-split table. */
+  @Test
+  public void testReadingWithFilterAndSubSplits() throws Exception {
+    final String table = "TEST-FILTER-SUB-SPLITS";
+    final int numRows = 1700;
+    final int numSamples = 10;
+    final int numSplits = 20;
+    final long bytesPerRow = 100L;
+
+    // Set up test table data and sample row keys for size estimation and splitting.
+    makeTableData(table, numRows);
+    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+    // Generate source and split it.
+    RowFilter filter =
+        RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build();
+    BigtableSource source =
+        new BigtableSource(service, table, filter, ByteKeyRange.ALL_KEYS, null /*size*/);
+    List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
+
+    // Test num splits and split equality.
+    assertThat(splits, hasSize(numSplits));
+    assertSourcesEqualReferenceSource(source, splits, null /* options */);
+  }
+
+  @Test
+  public void testReadingDisplayData() {
+    RowFilter rowFilter = RowFilter.newBuilder()
+        .setRowKeyRegexFilter(ByteString.copyFromUtf8("foo.*"))
+        .build();
+
+    BigtableIO.Read read = BigtableIO.read()
+        .withBigtableOptions(BIGTABLE_OPTIONS)
+        .withTableId("fooTable")
+        .withRowFilter(rowFilter);
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
+    assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString()));
+
+    // BigtableIO adds user-agent to options; assert only on key and not value.
+    assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions")));
+  }
+
+  /** Tests that a record gets written to the service and messages are logged. */
+  @Test
+  public void testWriting() throws Exception {
+    final String table = "table";
+    final String key = "key";
+    final String value = "value";
+
+    service.createTable(table);
+
+    TestPipeline p = TestPipeline.create();
+    p.apply("single row", Create.of(makeWrite(key, value)).withCoder(bigtableCoder))
+        .apply("write", defaultWrite.withTableId(table));
+    p.run();
+
+    logged.verifyInfo("Wrote 1 records");
+
+    assertEquals(1, service.tables.size());
+    assertNotNull(service.getTable(table));
+    Map<ByteString, ByteString> rows = service.getTable(table);
+    assertEquals(1, rows.size());
+    assertEquals(ByteString.copyFromUtf8(value), rows.get(ByteString.copyFromUtf8(key)));
+  }
+
+  /** Tests that when writing to a non-existent table, the write fails. */
+  @Test
+  public void testWritingFailsTableDoesNotExist() throws Exception {
+    final String table = "TEST-TABLE";
+
+    PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput =
+        TestPipeline.create().apply(Create.<KV<ByteString, Iterable<Mutation>>>of());
+
+    // Exception will be thrown by write.validate() when write is applied.
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(String.format("Table %s does not exist", table));
+
+    emptyInput.apply("write", defaultWrite.withTableId(table));
+  }
+
+  /** Tests that when writing an element fails, the write fails. */
+  @Test
+  public void testWritingFailsBadElement() throws Exception {
+    final String table = "TEST-TABLE";
+    final String key = "KEY";
+    service.createTable(table);
+
+    TestPipeline p = TestPipeline.create();
+    p.apply(Create.of(makeBadWrite(key)).withCoder(bigtableCoder))
+        .apply(defaultWrite.withTableId(table));
+
+    thrown.expect(PipelineExecutionException.class);
+    thrown.expectCause(Matchers.<Throwable>instanceOf(IOException.class));
+    thrown.expectMessage("At least 1 errors occurred writing to Bigtable. First 1 errors:");
+    thrown.expectMessage("Error mutating row " + key + " with mutations []: cell value missing");
+    p.run();
+  }
+
+  @Test
+  public void testWritingDisplayData() {
+    BigtableIO.Write write = BigtableIO.write()
+        .withTableId("fooTable")
+        .withBigtableOptions(BIGTABLE_OPTIONS);
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
+    // BigtableIO adds user-agent to options; assert only on key and not value.
+    assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions")));
+  }
+
+  ////////////////////////////////////////////////////////////////////////////////////////////
+  private static final String COLUMN_FAMILY_NAME = "family";
+  private static final ByteString COLUMN_NAME = ByteString.copyFromUtf8("column");
+  private static final Column TEST_COLUMN = Column.newBuilder().setQualifier(COLUMN_NAME).build();
+  private static final Family TEST_FAMILY = Family.newBuilder().setName(COLUMN_FAMILY_NAME).build();
+
+  /** Helper function that builds a {@link Row} in a test table that could be returned by read. */
+  private static Row makeRow(ByteString key, ByteString value) {
+    // Build the currentRow and return true.
+    Column.Builder newColumn = TEST_COLUMN.toBuilder().addCells(Cell.newBuilder().setValue(value));
+    return Row.newBuilder()
+        .setKey(key)
+        .addFamilies(TEST_FAMILY.toBuilder().addColumns(newColumn))
+        .build();
+  }
+
+  /** Helper function to create a table and return the rows that it created. */
+  private static List<Row> makeTableData(String tableId, int numRows) {
+    service.createTable(tableId);
+    Map<ByteString, ByteString> testData = service.getTable(tableId);
+
+    List<Row> testRows = new ArrayList<>(numRows);
+    for (int i = 0; i < numRows; ++i) {
+      ByteString key = ByteString.copyFromUtf8(String.format("key%09d", i));
+      ByteString value = ByteString.copyFromUtf8(String.format("value%09d", i));
+      testData.put(key, value);
+      testRows.add(makeRow(key, value));
+    }
+
+    return testRows;
+  }
+
+
+  /**
+   * A {@link BigtableService} implementation that stores tables and their contents in memory.
+   */
+  private static class FakeBigtableService implements BigtableService {
+    private final Map<String, SortedMap<ByteString, ByteString>> tables = new HashMap<>();
+    private final Map<String, List<SampleRowKeysResponse>> sampleRowKeys = new HashMap<>();
+
+    @Nullable
+    public SortedMap<ByteString, ByteString> getTable(String tableId) {
+      return tables.get(tableId);
+    }
+
+    public ByteKeyRange getTableRange(String tableId) {
+      verifyTableExists(tableId);
+      SortedMap<ByteString, ByteString> data = tables.get(tableId);
+      return ByteKeyRange.of(ByteKey.of(data.firstKey()), ByteKey.of(data.lastKey()));
+    }
+
+    public void createTable(String tableId) {
+      tables.put(tableId, new TreeMap<ByteString, ByteString>(new ByteStringComparator()));
+    }
+
+    @Override
+    public boolean tableExists(String tableId) {
+      return tables.containsKey(tableId);
+    }
+
+    public void verifyTableExists(String tableId) {
+      checkArgument(tableExists(tableId), "Table %s does not exist", tableId);
+    }
+
+    @Override
+    public FakeBigtableReader createReader(BigtableSource source) {
+      return new FakeBigtableReader(source);
+    }
+
+    @Override
+    public FakeBigtableWriter openForWriting(String tableId) {
+      return new FakeBigtableWriter(tableId);
+    }
+
+    @Override
+    public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) {
+      List<SampleRowKeysResponse> samples = sampleRowKeys.get(source.getTableId());
+      checkArgument(samples != null, "No samples found for table %s", source.getTableId());
+      return samples;
+    }
+
+    /** Sets up the sample row keys for the specified table. */
+    void setupSampleRowKeys(String tableId, int numSamples, long bytesPerRow) {
+      verifyTableExists(tableId);
+      checkArgument(numSamples > 0, "Number of samples must be positive: %s", numSamples);
+      checkArgument(bytesPerRow > 0, "Bytes/Row must be positive: %s", bytesPerRow);
+
+      ImmutableList.Builder<SampleRowKeysResponse> ret = ImmutableList.builder();
+      SortedMap<ByteString, ByteString> rows = getTable(tableId);
+      int currentSample = 1;
+      int rowsSoFar = 0;
+      for (Map.Entry<ByteString, ByteString> entry : rows.entrySet()) {
+        if (((double) rowsSoFar) / rows.size() >= ((double) currentSample) / numSamples) {
+          // add the sample with the total number of bytes in the table before this key.
+          ret.add(
+              SampleRowKeysResponse.newBuilder()
+                  .setRowKey(entry.getKey())
+                  .setOffsetBytes(rowsSoFar * bytesPerRow)
+                  .build());
+          // Move on to next sample
+          currentSample++;
+        }
+        ++rowsSoFar;
+      }
+
+      // Add the last sample indicating the end of the table, with all rows before it.
+      ret.add(SampleRowKeysResponse.newBuilder().setOffsetBytes(rows.size() * bytesPerRow).build());
+      sampleRowKeys.put(tableId, ret.build());
+    }
+  }
+
+  /**
+   * A {@link BigtableService.Reader} implementation that reads from the static instance of
+   * {@link FakeBigtableService} stored in {@link #service}.
+   *
+   * <p>This reader does not support {@link RowFilter} objects.
+   */
+  private static class FakeBigtableReader implements BigtableService.Reader {
+    private final BigtableSource source;
+    private Iterator<Map.Entry<ByteString, ByteString>> rows;
+    private Row currentRow;
+    private Predicate<ByteString> filter;
+
+    public FakeBigtableReader(BigtableSource source) {
+      this.source = source;
+      if (source.getRowFilter() == null) {
+        filter = Predicates.alwaysTrue();
+      } else {
+        ByteString keyRegex = source.getRowFilter().getRowKeyRegexFilter();
+        checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported");
+        filter = new KeyMatchesRegex(keyRegex.toStringUtf8());
+      }
+      service.verifyTableExists(source.getTableId());
+    }
+
+    @Override
+    public boolean start() {
+      rows = service.tables.get(source.getTableId()).entrySet().iterator();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() {
+      // Loop until we find a row in range, or reach the end of the iterator.
+      Map.Entry<ByteString, ByteString> entry = null;
+      while (rows.hasNext()) {
+        entry = rows.next();
+        if (!filter.apply(entry.getKey())
+            || !source.getRange().containsKey(ByteKey.of(entry.getKey()))) {
+          // Does not match row filter or does not match source range. Skip.
+          entry = null;
+          continue;
+        }
+        // Found a row inside this source's key range, stop.
+        break;
+      }
+
+      // Return false if no more rows.
+      if (entry == null) {
+        currentRow = null;
+        return false;
+      }
+
+      // Set the current row and return true.
+      currentRow = makeRow(entry.getKey(), entry.getValue());
+      return true;
+    }
+
+    @Override
+    public Row getCurrentRow() {
+      if (currentRow == null) {
+        throw new NoSuchElementException();
+      }
+      return currentRow;
+    }
+
+    @Override
+    public void close() {
+      rows = null;
+      currentRow = null;
+    }
+  }
+
+  /**
+   * A {@link BigtableService.Writer} implementation that writes to the static instance of
+   * {@link FakeBigtableService} stored in {@link #service}.
+   *
+   * <p>This writer only supports {@link Mutation Mutations} that consist only of {@link SetCell}
+   * entries. The column family in the {@link SetCell} is ignored; only the value is used.
+   *
+   * <p>When no {@link SetCell} is provided, the write will fail and this will be exposed via an
+   * exception on the returned {@link ListenableFuture}.
+   */
+  private static class FakeBigtableWriter implements BigtableService.Writer {
+    private final String tableId;
+
+    public FakeBigtableWriter(String tableId) {
+      this.tableId = tableId;
+    }
+
+    @Override
+    public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record) {
+      service.verifyTableExists(tableId);
+      Map<ByteString, ByteString> table = service.getTable(tableId);
+      ByteString key = record.getKey();
+      for (Mutation m : record.getValue()) {
+        SetCell cell = m.getSetCell();
+        if (cell.getValue().isEmpty()) {
+          return Futures.immediateFailedCheckedFuture(new IOException("cell value missing"));
+        }
+        table.put(key, cell.getValue());
+      }
+      return Futures.immediateFuture(Empty.getDefaultInstance());
+    }
+
+    @Override
+    public void close() {}
+  }
+
+  /** A serializable comparator for ByteString. Used to make row samples. */
+  private static final class ByteStringComparator implements Comparator<ByteString>, Serializable {
+    @Override
+    public int compare(ByteString o1, ByteString o2) {
+      return ByteKey.of(o1).compareTo(ByteKey.of(o2));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 95d1f55..0027e0e 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -35,6 +35,7 @@
   (sources and sinks) to consume and produce data from systems.</description>
 
   <modules>
+    <module>google-cloud-platform</module>
     <module>hdfs</module>
     <module>kafka</module>
   </modules>


[2/3] incubator-beam git commit: [BEAM-77] Move bigtable in IO

Posted by dh...@apache.org.
[BEAM-77] Move bigtable in IO


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d138ae54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d138ae54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d138ae54

Branch: refs/heads/master
Commit: d138ae5445315ced3e0a198ab1e99773a5ebfee0
Parents: 9746f0d
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed Apr 20 07:40:49 2016 +0200
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 26 14:08:54 2016 -0700

----------------------------------------------------------------------
 pom.xml                                         |    1 -
 sdks/java/core/pom.xml                          |   25 -
 .../apache/beam/sdk/io/bigtable/BigtableIO.java | 1016 ------------------
 .../beam/sdk/io/bigtable/BigtableService.java   |  111 --
 .../sdk/io/bigtable/BigtableServiceImpl.java    |  244 -----
 .../beam/sdk/io/bigtable/package-info.java      |   23 -
 .../beam/sdk/io/bigtable/BigtableIOTest.java    |  729 -------------
 sdks/java/io/google-cloud-platform/pom.xml      |   96 ++
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 1016 ++++++++++++++++++
 .../sdk/io/gcp/bigtable/BigtableService.java    |  111 ++
 .../io/gcp/bigtable/BigtableServiceImpl.java    |  244 +++++
 .../beam/sdk/io/gcp/bigtable/package-info.java  |   23 +
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     |  729 +++++++++++++
 sdks/java/io/pom.xml                            |    1 +
 14 files changed, 2220 insertions(+), 2149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 03f0fb2..1bd18ec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,7 +102,6 @@
     <!-- If updating dependencies, please update any relevant javadoc offlineLinks -->
     <avro.version>1.7.7</avro.version>
     <bigquery.version>v2-rev248-1.21.0</bigquery.version>
-    <bigtable.version>0.2.3</bigtable.version>
     <pubsubgrpc.version>0.0.2</pubsubgrpc.version>
     <clouddebugger.version>v2-rev6-1.21.0</clouddebugger.version>
     <dataflow.version>v1b3-rev22-1.21.0</dataflow.version>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index c634e9c..07d2fce 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -227,7 +227,6 @@
               <shadeTestJar>true</shadeTestJar>
               <artifactSet>
                 <includes>
-                  <include>com.google.cloud.bigtable:bigtable-client-core</include>
                   <include>com.google.guava:guava</include>
                 </includes>
               </artifactSet>
@@ -253,17 +252,6 @@
                   <pattern>com.google.thirdparty</pattern>
                   <shadedPattern>org.apache.beam.sdk.repackaged.com.google.thirdparty</shadedPattern>
                 </relocation>
-                <relocation>
-                  <pattern>com.google.cloud.bigtable</pattern>
-                  <shadedPattern>org.apache.beam.sdk.repackaged.com.google.cloud.bigtable</shadedPattern>
-                  <excludes>
-                    <exclude>com.google.cloud.bigtable.config.BigtableOptions*</exclude>
-                    <exclude>com.google.cloud.bigtable.config.CredentialOptions*</exclude>
-                    <exclude>com.google.cloud.bigtable.config.RetryOptions*</exclude>
-                    <exclude>com.google.cloud.bigtable.grpc.BigtableClusterName</exclude>
-                    <exclude>com.google.cloud.bigtable.grpc.BigtableTableName</exclude>
-                  </excludes>
-                </relocation>
               </relocations>
             </configuration>
           </execution>
@@ -281,7 +269,6 @@
               <finalName>${project.artifactId}-bundled-${project.version}</finalName>
               <artifactSet>
                 <excludes>
-                  <exclude>com.google.cloud.bigtable:bigtable-client-core</exclude>
                   <exclude>com.google.guava:guava</exclude>
                 </excludes>
               </artifactSet>
@@ -394,18 +381,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.cloud.bigtable</groupId>
-      <artifactId>bigtable-protos</artifactId>
-      <version>${bigtable.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.cloud.bigtable</groupId>
-      <artifactId>bigtable-client-core</artifactId>
-      <version>${bigtable.version}</version>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.api-client</groupId>
       <artifactId>google-api-client</artifactId>
       <version>${google-clients.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
deleted file mode 100644
index 28ff335..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
+++ /dev/null
@@ -1,1016 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.bigtable;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Sink.WriteOperation;
-import org.apache.beam.sdk.io.Sink.Writer;
-import org.apache.beam.sdk.io.range.ByteKey;
-import org.apache.beam.sdk.io.range.ByteKeyRange;
-import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.ReleaseInfo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowFilter;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import javax.annotation.Nullable;
-
-/**
- * A bounded source and sink for Google Cloud Bigtable.
- *
- * <p>For more information, see the online documentation at
- * <a href="https://cloud.google.com/bigtable/">Google Cloud Bigtable</a>.
- *
- * <h3>Reading from Cloud Bigtable</h3>
- *
- * <p>The Bigtable source returns a set of rows from a single table, returning a
- * {@code PCollection<Row>}.
- *
- * <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions}
- * or builder configured with the project and other information necessary to identify the
- * Bigtable cluster. A {@link RowFilter} may also optionally be specified using
- * {@link BigtableIO.Read#withRowFilter}. For example:
- *
- * <pre>{@code
- * BigtableOptions.Builder optionsBuilder =
- *     new BigtableOptions.Builder()
- *         .setProjectId("project")
- *         .setClusterId("cluster")
- *         .setZoneId("zone");
- *
- * Pipeline p = ...;
- *
- * // Scan the entire table.
- * p.apply("read",
- *     BigtableIO.read()
- *         .withBigtableOptions(optionsBuilder)
- *         .withTableId("table"));
- *
- * // Scan a subset of rows that match the specified row filter.
- * p.apply("filtered read",
- *     BigtableIO.read()
- *         .withBigtableOptions(optionsBuilder)
- *         .withTableId("table")
- *         .withRowFilter(filter));
- * }</pre>
- *
- * <h3>Writing to Cloud Bigtable</h3>
- *
- * <p>The Bigtable sink executes a set of row mutations on a single table. It takes as input a
- * {@link PCollection PCollection&lt;KV&lt;ByteString, Iterable&lt;Mutation&gt;&gt;&gt;}, where the
- * {@link ByteString} is the key of the row being mutated, and each {@link Mutation} represents an
- * idempotent transformation to that row.
- *
- * <p>To configure a Cloud Bigtable sink, you must supply a table id and a {@link BigtableOptions}
- * or builder configured with the project and other information necessary to identify the
- * Bigtable cluster, for example:
- *
- * <pre>{@code
- * BigtableOptions.Builder optionsBuilder =
- *     new BigtableOptions.Builder()
- *         .setProjectId("project")
- *         .setClusterId("cluster")
- *         .setZoneId("zone");
- *
- * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
- *
- * data.apply("write",
- *     BigtableIO.write()
- *         .withBigtableOptions(optionsBuilder)
- *         .withTableId("table"));
- * }</pre>
- *
- * <h3>Experimental</h3>
- *
- * <p>This connector for Cloud Bigtable is considered experimental and may break or receive
- * backwards-incompatible changes in future versions of the Cloud Dataflow SDK. Cloud Bigtable is
- * in Beta, and thus it may introduce breaking changes in future revisions of its service or APIs.
- *
- * <h3>Permissions</h3>
- *
- * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
- * Dataflow job. Please refer to the documentation of corresponding
- * {@link PipelineRunner PipelineRunners} for more details.
- */
-@Experimental
-public class BigtableIO {
-  private static final Logger logger = LoggerFactory.getLogger(BigtableIO.class);
-
-  /**
-   * Creates an uninitialized {@link BigtableIO.Read}. Before use, the {@code Read} must be
-   * initialized with a
-   * {@link BigtableIO.Read#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
-   * the source Cloud Bigtable cluster, and a {@link BigtableIO.Read#withTableId tableId} that
-   * specifies which table to read. A {@link RowFilter} may also optionally be specified using
-   * {@link BigtableIO.Read#withRowFilter}.
-   */
-  @Experimental
-  public static Read read() {
-    return new Read(null, "", null, null);
-  }
-
-  /**
-   * Creates an uninitialized {@link BigtableIO.Write}. Before use, the {@code Write} must be
-   * initialized with a
-   * {@link BigtableIO.Write#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
-   * the destination Cloud Bigtable cluster, and a {@link BigtableIO.Write#withTableId tableId} that
-   * specifies which table to write.
-   */
-  @Experimental
-  public static Write write() {
-    return new Write(null, "", null);
-  }
-
-  /**
-   * A {@link PTransform} that reads from Google Cloud Bigtable. See the class-level Javadoc on
-   * {@link BigtableIO} for more information.
-   *
-   * @see BigtableIO
-   */
-  @Experimental
-  public static class Read extends PTransform<PBegin, PCollection<Row>> {
-    /**
-     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
-     * indicated by the given options, and using any other specified customizations.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withBigtableOptions(BigtableOptions options) {
-      checkNotNull(options, "options");
-      return withBigtableOptions(options.toBuilder());
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
-     * indicated by the given options, and using any other specified customizations.
-     *
-     * <p>Clones the given {@link BigtableOptions} builder so that any further changes
-     * will have no effect on the returned {@link BigtableIO.Read}.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
-      checkNotNull(optionsBuilder, "optionsBuilder");
-      // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
-      BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
-      BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
-      return new Read(optionsWithAgent, tableId, filter, bigtableService);
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
-     * using the given row filter.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withRowFilter(RowFilter filter) {
-      checkNotNull(filter, "filter");
-      return new Read(options, tableId, filter, bigtableService);
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Read} that will read from the specified table.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withTableId(String tableId) {
-      checkNotNull(tableId, "tableId");
-      return new Read(options, tableId, filter, bigtableService);
-    }
-
-    /**
-     * Returns the Google Cloud Bigtable cluster being read from, and other parameters.
-     */
-    public BigtableOptions getBigtableOptions() {
-      return options;
-    }
-
-    /**
-     * Returns the table being read from.
-     */
-    public String getTableId() {
-      return tableId;
-    }
-
-    @Override
-    public PCollection<Row> apply(PBegin input) {
-      BigtableSource source =
-          new BigtableSource(getBigtableService(), tableId, filter, ByteKeyRange.ALL_KEYS, null);
-      return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
-    }
-
-    @Override
-    public void validate(PBegin input) {
-      checkArgument(options != null, "BigtableOptions not specified");
-      checkArgument(!tableId.isEmpty(), "Table ID not specified");
-      try {
-        checkArgument(
-            getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
-      } catch (IOException e) {
-        logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
-      }
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-
-      builder.add("tableId", tableId);
-
-      if (options != null) {
-        builder.add("bigtableOptions", options.toString());
-      }
-
-      if (filter != null) {
-        builder.add("rowFilter", filter.toString());
-      }
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(Read.class)
-          .add("options", options)
-          .add("tableId", tableId)
-          .add("filter", filter)
-          .toString();
-    }
-
-    /////////////////////////////////////////////////////////////////////////////////////////
-    /**
-     * Used to define the Cloud Bigtable cluster and any options for the networking layer.
-     * Cannot actually be {@code null} at validation time, but may start out {@code null} while
-     * source is being built.
-     */
-    @Nullable private final BigtableOptions options;
-    private final String tableId;
-    @Nullable private final RowFilter filter;
-    @Nullable private final BigtableService bigtableService;
-
-    private Read(
-        @Nullable BigtableOptions options,
-        String tableId,
-        @Nullable RowFilter filter,
-        @Nullable BigtableService bigtableService) {
-      this.options = options;
-      this.tableId = checkNotNull(tableId, "tableId");
-      this.filter = filter;
-      this.bigtableService = bigtableService;
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Read} that will read using the given Cloud Bigtable
-     * service implementation.
-     *
-     * <p>This is used for testing.
-     *
-     * <p>Does not modify this object.
-     */
-    Read withBigtableService(BigtableService bigtableService) {
-      checkNotNull(bigtableService, "bigtableService");
-      return new Read(options, tableId, filter, bigtableService);
-    }
-
-    /**
-     * Helper function that either returns the mock Bigtable service supplied by
-     * {@link #withBigtableService} or creates and returns an implementation that talks to
-     * {@code Cloud Bigtable}.
-     */
-    private BigtableService getBigtableService() {
-      if (bigtableService != null) {
-        return bigtableService;
-      }
-      return new BigtableServiceImpl(options);
-    }
-  }
-
-  /**
-   * A {@link PTransform} that writes to Google Cloud Bigtable. See the class-level Javadoc on
-   * {@link BigtableIO} for more information.
-   *
-   * @see BigtableIO
-   */
-  @Experimental
-  public static class Write
-      extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {
-    /**
-     * Used to define the Cloud Bigtable cluster and any options for the networking layer.
-     * Cannot actually be {@code null} at validation time, but may start out {@code null} while
-     * source is being built.
-     */
-    @Nullable private final BigtableOptions options;
-    private final String tableId;
-    @Nullable private final BigtableService bigtableService;
-
-    private Write(
-        @Nullable BigtableOptions options,
-        String tableId,
-        @Nullable BigtableService bigtableService) {
-      this.options = options;
-      this.tableId = checkNotNull(tableId, "tableId");
-      this.bigtableService = bigtableService;
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
-     * indicated by the given options, and using any other specified customizations.
-     *
-     * <p>Does not modify this object.
-     */
-    public Write withBigtableOptions(BigtableOptions options) {
-      checkNotNull(options, "options");
-      return withBigtableOptions(options.toBuilder());
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
-     * indicated by the given options, and using any other specified customizations.
-     *
-     * <p>Clones the given {@link BigtableOptions} builder so that any further changes
-     * will have no effect on the returned {@link BigtableIO.Write}.
-     *
-     * <p>Does not modify this object.
-     */
-    public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
-      checkNotNull(optionsBuilder, "optionsBuilder");
-      // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
-      BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
-      BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
-      return new Write(optionsWithAgent, tableId, bigtableService);
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Write} that will write to the specified table.
-     *
-     * <p>Does not modify this object.
-     */
-    public Write withTableId(String tableId) {
-      checkNotNull(tableId, "tableId");
-      return new Write(options, tableId, bigtableService);
-    }
-
-    /**
-     * Returns the Google Cloud Bigtable cluster being written to, and other parameters.
-     */
-    public BigtableOptions getBigtableOptions() {
-      return options;
-    }
-
-    /**
-     * Returns the table being written to.
-     */
-    public String getTableId() {
-      return tableId;
-    }
-
-    @Override
-    public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
-      Sink sink = new Sink(tableId, getBigtableService());
-      return input.apply(org.apache.beam.sdk.io.Write.to(sink));
-    }
-
-    @Override
-    public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
-      checkArgument(options != null, "BigtableOptions not specified");
-      checkArgument(!tableId.isEmpty(), "Table ID not specified");
-      try {
-        checkArgument(
-            getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
-      } catch (IOException e) {
-        logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
-      }
-    }
-
-    /**
-     * Returns a new {@link BigtableIO.Write} that will write using the given Cloud Bigtable
-     * service implementation.
-     *
-     * <p>This is used for testing.
-     *
-     * <p>Does not modify this object.
-     */
-    Write withBigtableService(BigtableService bigtableService) {
-      checkNotNull(bigtableService, "bigtableService");
-      return new Write(options, tableId, bigtableService);
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-
-      builder.add("tableId", tableId);
-
-      if (options != null) {
-        builder.add("bigtableOptions", options.toString());
-      }
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(Write.class)
-          .add("options", options)
-          .add("tableId", tableId)
-          .toString();
-    }
-
-    /**
-     * Helper function that either returns the mock Bigtable service supplied by
-     * {@link #withBigtableService} or creates and returns an implementation that talks to
-     * {@code Cloud Bigtable}.
-     */
-    private BigtableService getBigtableService() {
-      if (bigtableService != null) {
-        return bigtableService;
-      }
-      return new BigtableServiceImpl(options);
-    }
-  }
-
-  //////////////////////////////////////////////////////////////////////////////////////////
-  /** Disallow construction of utility class. */
-  private BigtableIO() {}
-
-  static class BigtableSource extends BoundedSource<Row> {
-    public BigtableSource(
-        BigtableService service,
-        String tableId,
-        @Nullable RowFilter filter,
-        ByteKeyRange range,
-        Long estimatedSizeBytes) {
-      this.service = service;
-      this.tableId = tableId;
-      this.filter = filter;
-      this.range = range;
-      this.estimatedSizeBytes = estimatedSizeBytes;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(BigtableSource.class)
-          .add("tableId", tableId)
-          .add("filter", filter)
-          .add("range", range)
-          .add("estimatedSizeBytes", estimatedSizeBytes)
-          .toString();
-    }
-
-    ////// Private state and internal implementation details //////
-    private final BigtableService service;
-    @Nullable private final String tableId;
-    @Nullable private final RowFilter filter;
-    private final ByteKeyRange range;
-    @Nullable private Long estimatedSizeBytes;
-    @Nullable private transient List<SampleRowKeysResponse> sampleRowKeys;
-
-    protected BigtableSource withStartKey(ByteKey startKey) {
-      checkNotNull(startKey, "startKey");
-      return new BigtableSource(
-          service, tableId, filter, range.withStartKey(startKey), estimatedSizeBytes);
-    }
-
-    protected BigtableSource withEndKey(ByteKey endKey) {
-      checkNotNull(endKey, "endKey");
-      return new BigtableSource(
-          service, tableId, filter, range.withEndKey(endKey), estimatedSizeBytes);
-    }
-
-    protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {
-      checkNotNull(estimatedSizeBytes, "estimatedSizeBytes");
-      return new BigtableSource(service, tableId, filter, range, estimatedSizeBytes);
-    }
-
-    /**
-     * Makes an API call to the Cloud Bigtable service that gives information about tablet key
-     * boundaries and estimated sizes. We can use these samples to ensure that splits are on
-     * different tablets, and possibly generate sub-splits within tablets.
-     */
-    private List<SampleRowKeysResponse> getSampleRowKeys() throws IOException {
-      return service.getSampleRowKeys(this);
-    }
-
-    @Override
-    public List<BigtableSource> splitIntoBundles(
-        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-      // Update the desiredBundleSizeBytes in order to limit the
-      // number of splits to maximumNumberOfSplits.
-      long maximumNumberOfSplits = 4000;
-      long sizeEstimate = getEstimatedSizeBytes(options);
-      desiredBundleSizeBytes =
-          Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes);
-
-      // Delegate to testable helper.
-      return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys());
-    }
-
-    /** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */
-    private List<BigtableSource> splitIntoBundlesBasedOnSamples(
-        long desiredBundleSizeBytes, List<SampleRowKeysResponse> sampleRowKeys) {
-      // There are no regions, or no samples available. Just scan the entire range.
-      if (sampleRowKeys.isEmpty()) {
-        logger.info("Not splitting source {} because no sample row keys are available.", this);
-        return Collections.singletonList(this);
-      }
-
-      logger.info(
-          "About to split into bundles of size {} with sampleRowKeys length {} first element {}",
-          desiredBundleSizeBytes,
-          sampleRowKeys.size(),
-          sampleRowKeys.get(0));
-
-      // Loop through all sampled responses and generate splits from the ones that overlap the
-      // scan range. The main complication is that we must track the end range of the previous
-      // sample to generate good ranges.
-      ByteKey lastEndKey = ByteKey.EMPTY;
-      long lastOffset = 0;
-      ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
-      for (SampleRowKeysResponse response : sampleRowKeys) {
-        ByteKey responseEndKey = ByteKey.of(response.getRowKey());
-        long responseOffset = response.getOffsetBytes();
-        checkState(
-            responseOffset >= lastOffset,
-            "Expected response byte offset %s to come after the last offset %s",
-            responseOffset,
-            lastOffset);
-
-        if (!range.overlaps(ByteKeyRange.of(lastEndKey, responseEndKey))) {
-          // This region does not overlap the scan, so skip it.
-          lastOffset = responseOffset;
-          lastEndKey = responseEndKey;
-          continue;
-        }
-
-        // Calculate the beginning of the split as the larger of startKey and the end of the last
-        // split. Unspecified start is smallest key so is correctly treated as earliest key.
-        ByteKey splitStartKey = lastEndKey;
-        if (splitStartKey.compareTo(range.getStartKey()) < 0) {
-          splitStartKey = range.getStartKey();
-        }
-
-        // Calculate the end of the split as the smaller of endKey and the end of this sample. Note
-        // that range.containsKey handles the case when range.getEndKey() is empty.
-        ByteKey splitEndKey = responseEndKey;
-        if (!range.containsKey(splitEndKey)) {
-          splitEndKey = range.getEndKey();
-        }
-
-        // We know this region overlaps the desired key range, and we know a rough estimate of its
-        // size. Split the key range into bundle-sized chunks and then add them all as splits.
-        long sampleSizeBytes = responseOffset - lastOffset;
-        List<BigtableSource> subSplits =
-            splitKeyRangeIntoBundleSizedSubranges(
-                sampleSizeBytes,
-                desiredBundleSizeBytes,
-                ByteKeyRange.of(splitStartKey, splitEndKey));
-        splits.addAll(subSplits);
-
-        // Move to the next region.
-        lastEndKey = responseEndKey;
-        lastOffset = responseOffset;
-      }
-
-      // We must add one more region after the end of the samples if both these conditions hold:
-      //  1. we did not scan to the end yet (lastEndKey is concrete, not 0-length).
-      //  2. we want to scan to the end (endKey is empty) or farther (lastEndKey < endKey).
-      if (!lastEndKey.isEmpty()
-          && (range.getEndKey().isEmpty() || lastEndKey.compareTo(range.getEndKey()) < 0)) {
-        splits.add(this.withStartKey(lastEndKey).withEndKey(range.getEndKey()));
-      }
-
-      List<BigtableSource> ret = splits.build();
-      logger.info("Generated {} splits. First split: {}", ret.size(), ret.get(0));
-      return ret;
-    }
-
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
-      // Delegate to testable helper.
-      if (estimatedSizeBytes == null) {
-        estimatedSizeBytes = getEstimatedSizeBytesBasedOnSamples(getSampleRowKeys());
-      }
-      return estimatedSizeBytes;
-    }
-
-    /**
-     * Computes the estimated size in bytes based on the total size of all samples that overlap
-     * the key range this source will scan.
-     */
-    private long getEstimatedSizeBytesBasedOnSamples(List<SampleRowKeysResponse> samples) {
-      long estimatedSizeBytes = 0;
-      long lastOffset = 0;
-      ByteKey currentStartKey = ByteKey.EMPTY;
-      // Compute the total estimated size as the size of each sample that overlaps the scan range.
-      // TODO: In future, Bigtable service may provide finer grained APIs, e.g., to sample given a
-      // filter or to sample on a given key range.
-      for (SampleRowKeysResponse response : samples) {
-        ByteKey currentEndKey = ByteKey.of(response.getRowKey());
-        long currentOffset = response.getOffsetBytes();
-        if (!currentStartKey.isEmpty() && currentStartKey.equals(currentEndKey)) {
-          // Skip an empty region.
-          lastOffset = currentOffset;
-          continue;
-        } else if (range.overlaps(ByteKeyRange.of(currentStartKey, currentEndKey))) {
-          estimatedSizeBytes += currentOffset - lastOffset;
-        }
-        currentStartKey = currentEndKey;
-        lastOffset = currentOffset;
-      }
-      return estimatedSizeBytes;
-    }
-
-    /**
-     * Cloud Bigtable returns query results ordered by key.
-     */
-    @Override
-    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-      return true;
-    }
-
-    @Override
-    public BoundedReader<Row> createReader(PipelineOptions options) throws IOException {
-      return new BigtableReader(this, service);
-    }
-
-    @Override
-    public void validate() {
-      checkArgument(!tableId.isEmpty(), "tableId cannot be empty");
-    }
-
-    @Override
-    public Coder<Row> getDefaultOutputCoder() {
-      return ProtoCoder.of(Row.class);
-    }
-
-    /** Helper that splits the specified range in this source into bundles. */
-    private List<BigtableSource> splitKeyRangeIntoBundleSizedSubranges(
-        long sampleSizeBytes, long desiredBundleSizeBytes, ByteKeyRange range) {
-      // Catch the trivial cases. Split is small enough already, or this is the last region.
-      logger.debug(
-          "Subsplit for sampleSizeBytes {} and desiredBundleSizeBytes {}",
-          sampleSizeBytes,
-          desiredBundleSizeBytes);
-      if (sampleSizeBytes <= desiredBundleSizeBytes) {
-        return Collections.singletonList(
-            this.withStartKey(range.getStartKey()).withEndKey(range.getEndKey()));
-      }
-
-      checkArgument(
-          sampleSizeBytes > 0, "Sample size %s bytes must be greater than 0.", sampleSizeBytes);
-      checkArgument(
-          desiredBundleSizeBytes > 0,
-          "Desired bundle size %s bytes must be greater than 0.",
-          desiredBundleSizeBytes);
-
-      int splitCount = (int) Math.ceil(((double) sampleSizeBytes) / (desiredBundleSizeBytes));
-      List<ByteKey> splitKeys = range.split(splitCount);
-      ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
-      Iterator<ByteKey> keys = splitKeys.iterator();
-      ByteKey prev = keys.next();
-      while (keys.hasNext()) {
-        ByteKey next = keys.next();
-        splits.add(
-            this
-                .withStartKey(prev)
-                .withEndKey(next)
-                .withEstimatedSizeBytes(sampleSizeBytes / splitCount));
-        prev = next;
-      }
-      return splits.build();
-    }
-
-    public ByteKeyRange getRange() {
-      return range;
-    }
-
-    public RowFilter getRowFilter() {
-      return filter;
-    }
-
-    public String getTableId() {
-      return tableId;
-    }
-  }
-
-  private static class BigtableReader extends BoundedReader<Row> {
-    // Thread-safety: source is protected via synchronization and is only accessed or modified
-    // inside a synchronized block (or constructor, which is the same).
-    private BigtableSource source;
-    private BigtableService service;
-    private BigtableService.Reader reader;
-    private final ByteKeyRangeTracker rangeTracker;
-    private long recordsReturned;
-
-    public BigtableReader(BigtableSource source, BigtableService service) {
-      this.source = source;
-      this.service = service;
-      rangeTracker = ByteKeyRangeTracker.of(source.getRange());
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      reader = service.createReader(getCurrentSource());
-      boolean hasRecord =
-          reader.start()
-              && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
-      if (hasRecord) {
-        ++recordsReturned;
-      }
-      return hasRecord;
-    }
-
-    @Override
-    public synchronized BigtableSource getCurrentSource() {
-      return source;
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      boolean hasRecord =
-          reader.advance()
-              && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
-      if (hasRecord) {
-        ++recordsReturned;
-      }
-      return hasRecord;
-    }
-
-    @Override
-    public Row getCurrent() throws NoSuchElementException {
-      return reader.getCurrentRow();
-    }
-
-    @Override
-    public void close() throws IOException {
-      logger.info("Closing reader after reading {} records.", recordsReturned);
-      if (reader != null) {
-        reader.close();
-        reader = null;
-      }
-    }
-
-    @Override
-    public final Double getFractionConsumed() {
-      return rangeTracker.getFractionConsumed();
-    }
-
-    @Override
-    public final synchronized BigtableSource splitAtFraction(double fraction) {
-      ByteKey splitKey;
-      try {
-        splitKey = source.getRange().interpolateKey(fraction);
-      } catch (IllegalArgumentException e) {
-        logger.info("%s: Failed to interpolate key for fraction %s.", source.getRange(), fraction);
-        return null;
-      }
-      logger.debug(
-          "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey);
-      if (!rangeTracker.trySplitAtPosition(splitKey)) {
-        return null;
-      }
-      BigtableSource primary = source.withEndKey(splitKey);
-      BigtableSource residual = source.withStartKey(splitKey);
-      this.source = primary;
-      return residual;
-    }
-  }
-
-  private static class Sink
-      extends org.apache.beam.sdk.io.Sink<KV<ByteString, Iterable<Mutation>>> {
-
-    public Sink(String tableId, BigtableService bigtableService) {
-      this.tableId = checkNotNull(tableId, "tableId");
-      this.bigtableService = checkNotNull(bigtableService, "bigtableService");
-    }
-
-    public String getTableId() {
-      return tableId;
-    }
-
-    public BigtableService getBigtableService() {
-      return bigtableService;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(Sink.class)
-          .add("bigtableService", bigtableService)
-          .add("tableId", tableId)
-          .toString();
-    }
-
-    ///////////////////////////////////////////////////////////////////////////////
-    private final String tableId;
-    private final BigtableService bigtableService;
-
-    @Override
-    public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> createWriteOperation(
-        PipelineOptions options) {
-      return new BigtableWriteOperation(this);
-    }
-
-    /** Does nothing, as it is redundant with {@link Write#validate}. */
-    @Override
-    public void validate(PipelineOptions options) {}
-  }
-
-  private static class BigtableWriteOperation
-      extends WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> {
-    private final Sink sink;
-
-    public BigtableWriteOperation(Sink sink) {
-      this.sink = sink;
-    }
-
-    @Override
-    public Writer<KV<ByteString, Iterable<Mutation>>, Long> createWriter(PipelineOptions options)
-        throws Exception {
-      return new BigtableWriter(this);
-    }
-
-    @Override
-    public void initialize(PipelineOptions options) {}
-
-    @Override
-    public void finalize(Iterable<Long> writerResults, PipelineOptions options) {
-      long count = 0;
-      for (Long value : writerResults) {
-        value += count;
-      }
-      logger.debug("Wrote {} elements to BigtableIO.Sink {}", sink);
-    }
-
-    @Override
-    public Sink getSink() {
-      return sink;
-    }
-
-    @Override
-    public Coder<Long> getWriterResultCoder() {
-      return VarLongCoder.of();
-    }
-  }
-
-  private static class BigtableWriter extends Writer<KV<ByteString, Iterable<Mutation>>, Long> {
-    private final BigtableWriteOperation writeOperation;
-    private final Sink sink;
-    private BigtableService.Writer bigtableWriter;
-    private long recordsWritten;
-    private final ConcurrentLinkedQueue<BigtableWriteException> failures;
-
-    public BigtableWriter(BigtableWriteOperation writeOperation) {
-      this.writeOperation = writeOperation;
-      this.sink = writeOperation.getSink();
-      this.failures = new ConcurrentLinkedQueue<>();
-    }
-
-    @Override
-    public void open(String uId) throws Exception {
-      bigtableWriter = sink.getBigtableService().openForWriting(sink.getTableId());
-      recordsWritten = 0;
-    }
-
-    /**
-     * If any write has asynchronously failed, fail the bundle with a useful error.
-     */
-    private void checkForFailures() throws IOException {
-      // Note that this function is never called by multiple threads and is the only place that
-      // we remove from failures, so this code is safe.
-      if (failures.isEmpty()) {
-        return;
-      }
-
-      StringBuilder logEntry = new StringBuilder();
-      int i = 0;
-      for (; i < 10 && !failures.isEmpty(); ++i) {
-        BigtableWriteException exc = failures.remove();
-        logEntry.append("\n").append(exc.getMessage());
-        if (exc.getCause() != null) {
-          logEntry.append(": ").append(exc.getCause().getMessage());
-        }
-      }
-      String message =
-          String.format(
-              "At least %d errors occurred writing to Bigtable. First %d errors: %s",
-              i + failures.size(),
-              i,
-              logEntry.toString());
-      logger.error(message);
-      throw new IOException(message);
-    }
-
-    @Override
-    public void write(KV<ByteString, Iterable<Mutation>> rowMutations) throws Exception {
-      checkForFailures();
-      Futures.addCallback(
-          bigtableWriter.writeRecord(rowMutations), new WriteExceptionCallback(rowMutations));
-      ++recordsWritten;
-    }
-
-    @Override
-    public Long close() throws Exception {
-      bigtableWriter.close();
-      bigtableWriter = null;
-      checkForFailures();
-      logger.info("Wrote {} records", recordsWritten);
-      return recordsWritten;
-    }
-
-    @Override
-    public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> getWriteOperation() {
-      return writeOperation;
-    }
-
-    private class WriteExceptionCallback implements FutureCallback<Empty> {
-      private final KV<ByteString, Iterable<Mutation>> value;
-
-      public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) {
-        this.value = value;
-      }
-
-      @Override
-      public void onFailure(Throwable cause) {
-        failures.add(new BigtableWriteException(value, cause));
-      }
-
-      @Override
-      public void onSuccess(Empty produced) {}
-    }
-  }
-
-  /**
-   * An exception that puts information about the failed record being written in its message.
-   */
-  static class BigtableWriteException extends IOException {
-    public BigtableWriteException(KV<ByteString, Iterable<Mutation>> record, Throwable cause) {
-      super(
-          String.format(
-              "Error mutating row %s with mutations %s",
-              record.getKey().toStringUtf8(),
-              record.getValue()),
-          cause);
-    }
-  }
-
-  /**
-   * A helper function to produce a Cloud Bigtable user agent string.
-   */
-  private static String getUserAgent() {
-    String javaVersion = System.getProperty("java.specification.version");
-    ReleaseInfo info = ReleaseInfo.getReleaseInfo();
-    return String.format(
-        "%s/%s (%s); %s",
-        info.getName(),
-        info.getVersion(),
-        javaVersion,
-        "0.2.3" /* TODO get Bigtable client version directly from jar. */);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableService.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableService.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableService.java
deleted file mode 100644
index cfae0ef..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableService.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.bigtable;
-
-import org.apache.beam.sdk.io.bigtable.BigtableIO.BigtableSource;
-import org.apache.beam.sdk.values.KV;
-
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * An interface for real or fake implementations of Cloud Bigtable.
- */
-interface BigtableService extends Serializable {
-
-  /**
-   * The interface of a class that can write to Cloud Bigtable.
-   */
-  interface Writer {
-    /**
-     * Writes a single row transaction to Cloud Bigtable. The key of the {@code record} is the
-     * row key to be mutated and the iterable of mutations represent the changes to be made to the
-     * row.
-     *
-     * @throws IOException if there is an error submitting the write.
-     */
-    ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
-        throws IOException;
-
-    /**
-     * Closes the writer.
-     *
-     * @throws IOException if any writes did not succeed
-     */
-    void close() throws IOException;
-  }
-
-  /**
-   * The interface of a class that reads from Cloud Bigtable.
-   */
-  interface Reader {
-    /**
-     * Reads the first element (including initialization, such as opening a network connection) and
-     * returns true if an element was found.
-     */
-    boolean start() throws IOException;
-
-    /**
-     * Attempts to read the next element, and returns true if an element has been read.
-     */
-    boolean advance() throws IOException;
-
-    /**
-     * Closes the reader.
-     *
-     * @throws IOException if there is an error.
-     */
-    void close() throws IOException;
-
-    /**
-     * Returns the last row read by a successful start() or advance(), or throws if there is no
-     * current row because the last such call was unsuccessful.
-     */
-    Row getCurrentRow() throws NoSuchElementException;
-  }
-
-  /**
-   * Returns {@code true} if the table with the give name exists.
-   */
-  boolean tableExists(String tableId) throws IOException;
-
-  /**
-   * Returns a {@link Reader} that will read from the specified source.
-   */
-  Reader createReader(BigtableSource source) throws IOException;
-
-  /**
-   * Returns a {@link Writer} that will write to the specified table.
-   */
-  Writer openForWriting(String tableId) throws IOException;
-
-  /**
-   * Returns a set of row keys sampled from the underlying table. These contain information about
-   * the distribution of keys within the table.
-   */
-  List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableServiceImpl.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableServiceImpl.java
deleted file mode 100644
index 87651f2..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableServiceImpl.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.bigtable;
-
-import org.apache.beam.sdk.io.bigtable.BigtableIO.BigtableSource;
-import org.apache.beam.sdk.values.KV;
-
-import com.google.bigtable.admin.table.v1.GetTableRequest;
-import com.google.bigtable.v1.MutateRowRequest;
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.ReadRowsRequest;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowRange;
-import com.google.bigtable.v1.SampleRowKeysRequest;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.cloud.bigtable.grpc.BigtableSession;
-import com.google.cloud.bigtable.grpc.async.AsyncExecutor;
-import com.google.cloud.bigtable.grpc.async.HeapSizeManager;
-import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
-import com.google.common.base.MoreObjects;
-import com.google.common.io.Closer;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import io.grpc.Status.Code;
-import io.grpc.StatusRuntimeException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * An implementation of {@link BigtableService} that actually communicates with the Cloud Bigtable
- * service.
- */
-class BigtableServiceImpl implements BigtableService {
-  private static final Logger logger = LoggerFactory.getLogger(BigtableService.class);
-
-  public BigtableServiceImpl(BigtableOptions options) {
-    this.options = options;
-  }
-
-  private final BigtableOptions options;
-
-  @Override
-  public BigtableWriterImpl openForWriting(String tableId) throws IOException {
-    BigtableSession session = new BigtableSession(options);
-    String tableName = options.getClusterName().toTableNameStr(tableId);
-    return new BigtableWriterImpl(session, tableName);
-  }
-
-  @Override
-  public boolean tableExists(String tableId) throws IOException {
-    if (!BigtableSession.isAlpnProviderEnabled()) {
-      logger.info(
-          "Skipping existence check for table {} (BigtableOptions {}) because ALPN is not"
-              + " configured.",
-          tableId,
-          options);
-      return true;
-    }
-
-    try (BigtableSession session = new BigtableSession(options)) {
-      GetTableRequest getTable =
-          GetTableRequest.newBuilder()
-              .setName(options.getClusterName().toTableNameStr(tableId))
-              .build();
-      session.getTableAdminClient().getTable(getTable);
-      return true;
-    } catch (StatusRuntimeException e) {
-      if (e.getStatus().getCode() == Code.NOT_FOUND) {
-        return false;
-      }
-      String message =
-          String.format(
-              "Error checking whether table %s (BigtableOptions %s) exists", tableId, options);
-      logger.error(message, e);
-      throw new IOException(message, e);
-    }
-  }
-
-  private class BigtableReaderImpl implements Reader {
-    private BigtableSession session;
-    private final BigtableSource source;
-    private ResultScanner<Row> results;
-    private Row currentRow;
-
-    public BigtableReaderImpl(BigtableSession session, BigtableSource source) {
-      this.session = session;
-      this.source = source;
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      RowRange range =
-          RowRange.newBuilder()
-              .setStartKey(source.getRange().getStartKey().getValue())
-              .setEndKey(source.getRange().getEndKey().getValue())
-              .build();
-      ReadRowsRequest.Builder requestB =
-          ReadRowsRequest.newBuilder()
-              .setRowRange(range)
-              .setTableName(options.getClusterName().toTableNameStr(source.getTableId()));
-      if (source.getRowFilter() != null) {
-        requestB.setFilter(source.getRowFilter());
-      }
-      results = session.getDataClient().readRows(requestB.build());
-      return advance();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      currentRow = results.next();
-      return (currentRow != null);
-    }
-
-    @Override
-    public void close() throws IOException {
-      // Goal: by the end of this function, both results and session are null and closed,
-      // independent of what errors they throw or prior state.
-
-      if (session == null) {
-        // Only possible when previously closed, so we know that results is also null.
-        return;
-      }
-
-      // Session does not implement Closeable -- it's AutoCloseable. So we can't register it with
-      // the Closer, but we can use the Closer to simplify the error handling.
-      try (Closer closer = Closer.create()) {
-        if (results != null) {
-          closer.register(results);
-          results = null;
-        }
-
-        session.close();
-      } finally {
-        session = null;
-      }
-    }
-
-    @Override
-    public Row getCurrentRow() throws NoSuchElementException {
-      if (currentRow == null) {
-        throw new NoSuchElementException();
-      }
-      return currentRow;
-    }
-  }
-
-  private static class BigtableWriterImpl implements Writer {
-    private BigtableSession session;
-    private AsyncExecutor executor;
-    private final MutateRowRequest.Builder partialBuilder;
-
-    public BigtableWriterImpl(BigtableSession session, String tableName) {
-      this.session = session;
-      this.executor =
-          new AsyncExecutor(
-              session.getDataClient(),
-              new HeapSizeManager(
-                  AsyncExecutor.ASYNC_MUTATOR_MAX_MEMORY_DEFAULT,
-                  AsyncExecutor.MAX_INFLIGHT_RPCS_DEFAULT));
-
-      partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName);
-    }
-
-    @Override
-    public void close() throws IOException {
-      try {
-        if (executor != null) {
-          executor.flush();
-          executor = null;
-        }
-      } finally {
-        if (session != null) {
-          session.close();
-          session = null;
-        }
-      }
-    }
-
-    @Override
-    public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
-        throws IOException {
-      MutateRowRequest r =
-          partialBuilder
-              .clone()
-              .setRowKey(record.getKey())
-              .addAllMutations(record.getValue())
-              .build();
-      try {
-        return executor.mutateRowAsync(r);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException("Write interrupted", e);
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects
-        .toStringHelper(BigtableServiceImpl.class)
-        .add("options", options)
-        .toString();
-  }
-
-  @Override
-  public Reader createReader(BigtableSource source) throws IOException {
-    BigtableSession session = new BigtableSession(options);
-    return new BigtableReaderImpl(session, source);
-  }
-
-  @Override
-  public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException {
-    try (BigtableSession session = new BigtableSession(options)) {
-      SampleRowKeysRequest request =
-          SampleRowKeysRequest.newBuilder()
-              .setTableName(options.getClusterName().toTableNameStr(source.getTableId()))
-              .build();
-      return session.getDataClient().sampleRowKeys(request);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/package-info.java
deleted file mode 100644
index f094cd4..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines transforms for reading and writing from Google Cloud Bigtable.
- *
- * @see org.apache.beam.sdk.io.bigtable.BigtableIO
- */
-package org.apache.beam.sdk.io.bigtable;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java
deleted file mode 100644
index 7c176e4..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java
+++ /dev/null
@@ -1,729 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.bigtable;
-
-import static org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource;
-import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
-import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
-import static org.apache.beam.sdk.testing.SourceTestUtils
-    .assertSplitAtFractionSucceedsAndConsistent;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Verify.verifyNotNull;
-
-import static org.hamcrest.Matchers.hasSize;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.bigtable.BigtableIO.BigtableSource;
-import org.apache.beam.sdk.io.range.ByteKey;
-import org.apache.beam.sdk.io.range.ByteKeyRange;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-import com.google.bigtable.v1.Cell;
-import com.google.bigtable.v1.Column;
-import com.google.bigtable.v1.Family;
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Mutation.SetCell;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowFilter;
-import com.google.bigtable.v1.SampleRowKeysResponse;
-import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
-
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import javax.annotation.Nullable;
-
-/**
- * Unit tests for {@link BigtableIO}.
- */
-@RunWith(JUnit4.class)
-public class BigtableIOTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  @Rule public ExpectedLogs logged = ExpectedLogs.none(BigtableIO.class);
-
-  /**
-   * These tests requires a static instance of the {@link FakeBigtableService} because the writers
-   * go through a serialization step when executing the test and would not affect passed-in objects
-   * otherwise.
-   */
-  private static FakeBigtableService service;
-  private static final BigtableOptions BIGTABLE_OPTIONS =
-      new BigtableOptions.Builder()
-          .setProjectId("project")
-          .setClusterId("cluster")
-          .setZoneId("zone")
-          .build();
-  private static BigtableIO.Read defaultRead =
-      BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS);
-  private static BigtableIO.Write defaultWrite =
-      BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS);
-  private Coder<KV<ByteString, Iterable<Mutation>>> bigtableCoder;
-  private static final TypeDescriptor<KV<ByteString, Iterable<Mutation>>> BIGTABLE_WRITE_TYPE =
-      new TypeDescriptor<KV<ByteString, Iterable<Mutation>>>() {};
-
-  @Before
-  public void setup() throws Exception {
-    service = new FakeBigtableService();
-    defaultRead = defaultRead.withBigtableService(service);
-    defaultWrite = defaultWrite.withBigtableService(service);
-    bigtableCoder = TestPipeline.create().getCoderRegistry().getCoder(BIGTABLE_WRITE_TYPE);
-  }
-
-  @Test
-  public void testReadBuildsCorrectly() {
-    BigtableIO.Read read =
-        BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS).withTableId("table");
-    assertEquals("project", read.getBigtableOptions().getProjectId());
-    assertEquals("cluster", read.getBigtableOptions().getClusterId());
-    assertEquals("zone", read.getBigtableOptions().getZoneId());
-    assertEquals("table", read.getTableId());
-  }
-
-  @Test
-  public void testReadBuildsCorrectlyInDifferentOrder() {
-    BigtableIO.Read read =
-        BigtableIO.read().withTableId("table").withBigtableOptions(BIGTABLE_OPTIONS);
-    assertEquals("project", read.getBigtableOptions().getProjectId());
-    assertEquals("cluster", read.getBigtableOptions().getClusterId());
-    assertEquals("zone", read.getBigtableOptions().getZoneId());
-    assertEquals("table", read.getTableId());
-  }
-
-  @Test
-  public void testWriteBuildsCorrectly() {
-    BigtableIO.Write write =
-        BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS).withTableId("table");
-    assertEquals("table", write.getTableId());
-    assertEquals("project", write.getBigtableOptions().getProjectId());
-    assertEquals("zone", write.getBigtableOptions().getZoneId());
-    assertEquals("cluster", write.getBigtableOptions().getClusterId());
-  }
-
-  @Test
-  public void testWriteBuildsCorrectlyInDifferentOrder() {
-    BigtableIO.Write write =
-        BigtableIO.write().withTableId("table").withBigtableOptions(BIGTABLE_OPTIONS);
-    assertEquals("cluster", write.getBigtableOptions().getClusterId());
-    assertEquals("project", write.getBigtableOptions().getProjectId());
-    assertEquals("zone", write.getBigtableOptions().getZoneId());
-    assertEquals("table", write.getTableId());
-  }
-
-  @Test
-  public void testWriteValidationFailsMissingTable() {
-    BigtableIO.Write write = BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS);
-
-    thrown.expect(IllegalArgumentException.class);
-
-    write.validate(null);
-  }
-
-  @Test
-  public void testWriteValidationFailsMissingOptions() {
-    BigtableIO.Write write = BigtableIO.write().withTableId("table");
-
-    thrown.expect(IllegalArgumentException.class);
-
-    write.validate(null);
-  }
-
-  /** Helper function to make a single row mutation to be written. */
-  private static KV<ByteString, Iterable<Mutation>> makeWrite(String key, String value) {
-    ByteString rowKey = ByteString.copyFromUtf8(key);
-    Iterable<Mutation> mutations =
-        ImmutableList.of(
-            Mutation.newBuilder()
-                .setSetCell(SetCell.newBuilder().setValue(ByteString.copyFromUtf8(value)))
-                .build());
-    return KV.of(rowKey, mutations);
-  }
-
-  /** Helper function to make a single bad row mutation (no set cell). */
-  private static KV<ByteString, Iterable<Mutation>> makeBadWrite(String key) {
-    Iterable<Mutation> mutations = ImmutableList.of(Mutation.newBuilder().build());
-    return KV.of(ByteString.copyFromUtf8(key), mutations);
-  }
-
-  /** Tests that when reading from a non-existent table, the read fails. */
-  @Test
-  public void testReadingFailsTableDoesNotExist() throws Exception {
-    final String table = "TEST-TABLE";
-
-    BigtableIO.Read read =
-        BigtableIO.read()
-            .withBigtableOptions(BIGTABLE_OPTIONS)
-            .withTableId(table)
-            .withBigtableService(service);
-
-    // Exception will be thrown by read.validate() when read is applied.
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(String.format("Table %s does not exist", table));
-
-    TestPipeline.create().apply(read);
-  }
-
-  /** Tests that when reading from an empty table, the read succeeds. */
-  @Test
-  public void testReadingEmptyTable() throws Exception {
-    final String table = "TEST-EMPTY-TABLE";
-    service.createTable(table);
-
-    TestPipeline p = TestPipeline.create();
-    PCollection<Row> rows = p.apply(defaultRead.withTableId(table));
-    PAssert.that(rows).empty();
-
-    p.run();
-    logged.verifyInfo(String.format("Closing reader after reading 0 records."));
-  }
-
-  /** Tests reading all rows from a table. */
-  @Test
-  public void testReading() throws Exception {
-    final String table = "TEST-MANY-ROWS-TABLE";
-    final int numRows = 1001;
-    List<Row> testRows = makeTableData(table, numRows);
-
-    TestPipeline p = TestPipeline.create();
-    PCollection<Row> rows = p.apply(defaultRead.withTableId(table));
-    PAssert.that(rows).containsInAnyOrder(testRows);
-
-    p.run();
-    logged.verifyInfo(String.format("Closing reader after reading %d records.", numRows));
-  }
-
-  /** A {@link Predicate} that a {@link Row Row's} key matches the given regex. */
-  private static class KeyMatchesRegex implements Predicate<ByteString> {
-    private final String regex;
-
-    public KeyMatchesRegex(String regex) {
-      this.regex = regex;
-    }
-
-    @Override
-    public boolean apply(@Nullable ByteString input) {
-      verifyNotNull(input, "input");
-      return input.toStringUtf8().matches(regex);
-    }
-  }
-
-  /** Tests reading all rows using a filter. */
-  @Test
-  public void testReadingWithFilter() throws Exception {
-    final String table = "TEST-FILTER-TABLE";
-    final int numRows = 1001;
-    List<Row> testRows = makeTableData(table, numRows);
-    String regex = ".*17.*";
-    final KeyMatchesRegex keyPredicate = new KeyMatchesRegex(regex);
-    Iterable<Row> filteredRows =
-        Iterables.filter(
-            testRows,
-            new Predicate<Row>() {
-              @Override
-              public boolean apply(@Nullable Row input) {
-                verifyNotNull(input, "input");
-                return keyPredicate.apply(input.getKey());
-              }
-            });
-
-    RowFilter filter =
-        RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(regex)).build();
-
-    TestPipeline p = TestPipeline.create();
-    PCollection<Row> rows = p.apply(defaultRead.withTableId(table).withRowFilter(filter));
-    PAssert.that(rows).containsInAnyOrder(filteredRows);
-
-    p.run();
-  }
-
-  /**
-   * Tests dynamic work rebalancing exhaustively.
-   *
-   * <p>Because this test runs so slowly, it is disabled by default. Re-run when changing the
-   * {@link BigtableIO.Read} implementation.
-   */
-  @Ignore("Slow. Rerun when changing the implementation.")
-  @Test
-  public void testReadingSplitAtFractionExhaustive() throws Exception {
-    final String table = "TEST-FEW-ROWS-SPLIT-EXHAUSTIVE-TABLE";
-    final int numRows = 10;
-    final int numSamples = 1;
-    final long bytesPerRow = 1L;
-    makeTableData(table, numRows);
-    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
-
-    BigtableSource source =
-        new BigtableSource(service, table, null, service.getTableRange(table), null);
-    assertSplitAtFractionExhaustive(source, null);
-  }
-
-  /**
-   * Unit tests of splitAtFraction.
-   */
-  @Test
-  public void testReadingSplitAtFraction() throws Exception {
-    final String table = "TEST-SPLIT-AT-FRACTION";
-    final int numRows = 10;
-    final int numSamples = 1;
-    final long bytesPerRow = 1L;
-    makeTableData(table, numRows);
-    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
-
-    BigtableSource source =
-        new BigtableSource(service, table, null, service.getTableRange(table), null);
-    // With 0 items read, all split requests will fail.
-    assertSplitAtFractionFails(source, 0, 0.1, null /* options */);
-    assertSplitAtFractionFails(source, 0, 1.0, null /* options */);
-    // With 1 items read, all split requests past 1/10th will succeed.
-    assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.333, null /* options */);
-    assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.666, null /* options */);
-    // With 3 items read, all split requests past 3/10ths will succeed.
-    assertSplitAtFractionFails(source, 3, 0.2, null /* options */);
-    assertSplitAtFractionSucceedsAndConsistent(source, 3, 0.571, null /* options */);
-    assertSplitAtFractionSucceedsAndConsistent(source, 3, 0.9, null /* options */);
-    // With 6 items read, all split requests past 6/10ths will succeed.
-    assertSplitAtFractionFails(source, 6, 0.5, null /* options */);
-    assertSplitAtFractionSucceedsAndConsistent(source, 6, 0.7, null /* options */);
-  }
-
-  /** Tests reading all rows from a split table. */
-  @Test
-  public void testReadingWithSplits() throws Exception {
-    final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
-    final int numRows = 1500;
-    final int numSamples = 10;
-    final long bytesPerRow = 100L;
-
-    // Set up test table data and sample row keys for size estimation and splitting.
-    makeTableData(table, numRows);
-    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
-
-    // Generate source and split it.
-    BigtableSource source =
-        new BigtableSource(service, table, null /*filter*/, ByteKeyRange.ALL_KEYS, null /*size*/);
-    List<BigtableSource> splits =
-        source.splitIntoBundles(numRows * bytesPerRow / numSamples, null /* options */);
-
-    // Test num splits and split equality.
-    assertThat(splits, hasSize(numSamples));
-    assertSourcesEqualReferenceSource(source, splits, null /* options */);
-  }
-
-  /** Tests reading all rows from a sub-split table. */
-  @Test
-  public void testReadingWithSubSplits() throws Exception {
-    final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
-    final int numRows = 1000;
-    final int numSamples = 10;
-    final int numSplits = 20;
-    final long bytesPerRow = 100L;
-
-    // Set up test table data and sample row keys for size estimation and splitting.
-    makeTableData(table, numRows);
-    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
-
-    // Generate source and split it.
-    BigtableSource source =
-        new BigtableSource(service, table, null /*filter*/, ByteKeyRange.ALL_KEYS, null /*size*/);
-    List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
-
-    // Test num splits and split equality.
-    assertThat(splits, hasSize(numSplits));
-    assertSourcesEqualReferenceSource(source, splits, null /* options */);
-  }
-
-  /** Tests reading all rows from a sub-split table. */
-  @Test
-  public void testReadingWithFilterAndSubSplits() throws Exception {
-    final String table = "TEST-FILTER-SUB-SPLITS";
-    final int numRows = 1700;
-    final int numSamples = 10;
-    final int numSplits = 20;
-    final long bytesPerRow = 100L;
-
-    // Set up test table data and sample row keys for size estimation and splitting.
-    makeTableData(table, numRows);
-    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
-
-    // Generate source and split it.
-    RowFilter filter =
-        RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build();
-    BigtableSource source =
-        new BigtableSource(service, table, filter, ByteKeyRange.ALL_KEYS, null /*size*/);
-    List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
-
-    // Test num splits and split equality.
-    assertThat(splits, hasSize(numSplits));
-    assertSourcesEqualReferenceSource(source, splits, null /* options */);
-  }
-
-  @Test
-  public void testReadingDisplayData() {
-    RowFilter rowFilter = RowFilter.newBuilder()
-        .setRowKeyRegexFilter(ByteString.copyFromUtf8("foo.*"))
-        .build();
-
-    BigtableIO.Read read = BigtableIO.read()
-        .withBigtableOptions(BIGTABLE_OPTIONS)
-        .withTableId("fooTable")
-        .withRowFilter(rowFilter);
-
-    DisplayData displayData = DisplayData.from(read);
-
-    assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
-    assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString()));
-
-    // BigtableIO adds user-agent to options; assert only on key and not value.
-    assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions")));
-  }
-
-  /** Tests that a record gets written to the service and messages are logged. */
-  @Test
-  public void testWriting() throws Exception {
-    final String table = "table";
-    final String key = "key";
-    final String value = "value";
-
-    service.createTable(table);
-
-    TestPipeline p = TestPipeline.create();
-    p.apply("single row", Create.of(makeWrite(key, value)).withCoder(bigtableCoder))
-        .apply("write", defaultWrite.withTableId(table));
-    p.run();
-
-    logged.verifyInfo("Wrote 1 records");
-
-    assertEquals(1, service.tables.size());
-    assertNotNull(service.getTable(table));
-    Map<ByteString, ByteString> rows = service.getTable(table);
-    assertEquals(1, rows.size());
-    assertEquals(ByteString.copyFromUtf8(value), rows.get(ByteString.copyFromUtf8(key)));
-  }
-
-  /** Tests that when writing to a non-existent table, the write fails. */
-  @Test
-  public void testWritingFailsTableDoesNotExist() throws Exception {
-    final String table = "TEST-TABLE";
-
-    PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput =
-        TestPipeline.create().apply(Create.<KV<ByteString, Iterable<Mutation>>>of());
-
-    // Exception will be thrown by write.validate() when write is applied.
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(String.format("Table %s does not exist", table));
-
-    emptyInput.apply("write", defaultWrite.withTableId(table));
-  }
-
-  /** Tests that when writing an element fails, the write fails. */
-  @Test
-  public void testWritingFailsBadElement() throws Exception {
-    final String table = "TEST-TABLE";
-    final String key = "KEY";
-    service.createTable(table);
-
-    TestPipeline p = TestPipeline.create();
-    p.apply(Create.of(makeBadWrite(key)).withCoder(bigtableCoder))
-        .apply(defaultWrite.withTableId(table));
-
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(Matchers.<Throwable>instanceOf(IOException.class));
-    thrown.expectMessage("At least 1 errors occurred writing to Bigtable. First 1 errors:");
-    thrown.expectMessage("Error mutating row " + key + " with mutations []: cell value missing");
-    p.run();
-  }
-
-  @Test
-  public void testWritingDisplayData() {
-    BigtableIO.Write write = BigtableIO.write()
-        .withTableId("fooTable")
-        .withBigtableOptions(BIGTABLE_OPTIONS);
-
-    DisplayData displayData = DisplayData.from(write);
-
-    assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
-    // BigtableIO adds user-agent to options; assert only on key and not value.
-    assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions")));
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////////////
-  private static final String COLUMN_FAMILY_NAME = "family";
-  private static final ByteString COLUMN_NAME = ByteString.copyFromUtf8("column");
-  private static final Column TEST_COLUMN = Column.newBuilder().setQualifier(COLUMN_NAME).build();
-  private static final Family TEST_FAMILY = Family.newBuilder().setName(COLUMN_FAMILY_NAME).build();
-
-  /** Helper function that builds a {@link Row} in a test table that could be returned by read. */
-  private static Row makeRow(ByteString key, ByteString value) {
-    // Build the currentRow and return true.
-    Column.Builder newColumn = TEST_COLUMN.toBuilder().addCells(Cell.newBuilder().setValue(value));
-    return Row.newBuilder()
-        .setKey(key)
-        .addFamilies(TEST_FAMILY.toBuilder().addColumns(newColumn))
-        .build();
-  }
-
-  /** Helper function to create a table and return the rows that it created. */
-  private static List<Row> makeTableData(String tableId, int numRows) {
-    service.createTable(tableId);
-    Map<ByteString, ByteString> testData = service.getTable(tableId);
-
-    List<Row> testRows = new ArrayList<>(numRows);
-    for (int i = 0; i < numRows; ++i) {
-      ByteString key = ByteString.copyFromUtf8(String.format("key%09d", i));
-      ByteString value = ByteString.copyFromUtf8(String.format("value%09d", i));
-      testData.put(key, value);
-      testRows.add(makeRow(key, value));
-    }
-
-    return testRows;
-  }
-
-
-  /**
-   * A {@link BigtableService} implementation that stores tables and their contents in memory.
-   */
-  private static class FakeBigtableService implements BigtableService {
-    private final Map<String, SortedMap<ByteString, ByteString>> tables = new HashMap<>();
-    private final Map<String, List<SampleRowKeysResponse>> sampleRowKeys = new HashMap<>();
-
-    @Nullable
-    public SortedMap<ByteString, ByteString> getTable(String tableId) {
-      return tables.get(tableId);
-    }
-
-    public ByteKeyRange getTableRange(String tableId) {
-      verifyTableExists(tableId);
-      SortedMap<ByteString, ByteString> data = tables.get(tableId);
-      return ByteKeyRange.of(ByteKey.of(data.firstKey()), ByteKey.of(data.lastKey()));
-    }
-
-    public void createTable(String tableId) {
-      tables.put(tableId, new TreeMap<ByteString, ByteString>(new ByteStringComparator()));
-    }
-
-    @Override
-    public boolean tableExists(String tableId) {
-      return tables.containsKey(tableId);
-    }
-
-    public void verifyTableExists(String tableId) {
-      checkArgument(tableExists(tableId), "Table %s does not exist", tableId);
-    }
-
-    @Override
-    public FakeBigtableReader createReader(BigtableSource source) {
-      return new FakeBigtableReader(source);
-    }
-
-    @Override
-    public FakeBigtableWriter openForWriting(String tableId) {
-      return new FakeBigtableWriter(tableId);
-    }
-
-    @Override
-    public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) {
-      List<SampleRowKeysResponse> samples = sampleRowKeys.get(source.getTableId());
-      checkArgument(samples != null, "No samples found for table %s", source.getTableId());
-      return samples;
-    }
-
-    /** Sets up the sample row keys for the specified table. */
-    void setupSampleRowKeys(String tableId, int numSamples, long bytesPerRow) {
-      verifyTableExists(tableId);
-      checkArgument(numSamples > 0, "Number of samples must be positive: %s", numSamples);
-      checkArgument(bytesPerRow > 0, "Bytes/Row must be positive: %s", bytesPerRow);
-
-      ImmutableList.Builder<SampleRowKeysResponse> ret = ImmutableList.builder();
-      SortedMap<ByteString, ByteString> rows = getTable(tableId);
-      int currentSample = 1;
-      int rowsSoFar = 0;
-      for (Map.Entry<ByteString, ByteString> entry : rows.entrySet()) {
-        if (((double) rowsSoFar) / rows.size() >= ((double) currentSample) / numSamples) {
-          // add the sample with the total number of bytes in the table before this key.
-          ret.add(
-              SampleRowKeysResponse.newBuilder()
-                  .setRowKey(entry.getKey())
-                  .setOffsetBytes(rowsSoFar * bytesPerRow)
-                  .build());
-          // Move on to next sample
-          currentSample++;
-        }
-        ++rowsSoFar;
-      }
-
-      // Add the last sample indicating the end of the table, with all rows before it.
-      ret.add(SampleRowKeysResponse.newBuilder().setOffsetBytes(rows.size() * bytesPerRow).build());
-      sampleRowKeys.put(tableId, ret.build());
-    }
-  }
-
-  /**
-   * A {@link BigtableService.Reader} implementation that reads from the static instance of
-   * {@link FakeBigtableService} stored in {@link #service}.
-   *
-   * <p>This reader does not support {@link RowFilter} objects.
-   */
-  private static class FakeBigtableReader implements BigtableService.Reader {
-    private final BigtableSource source;
-    private Iterator<Map.Entry<ByteString, ByteString>> rows;
-    private Row currentRow;
-    private Predicate<ByteString> filter;
-
-    public FakeBigtableReader(BigtableSource source) {
-      this.source = source;
-      if (source.getRowFilter() == null) {
-        filter = Predicates.alwaysTrue();
-      } else {
-        ByteString keyRegex = source.getRowFilter().getRowKeyRegexFilter();
-        checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported");
-        filter = new KeyMatchesRegex(keyRegex.toStringUtf8());
-      }
-      service.verifyTableExists(source.getTableId());
-    }
-
-    @Override
-    public boolean start() {
-      rows = service.tables.get(source.getTableId()).entrySet().iterator();
-      return advance();
-    }
-
-    @Override
-    public boolean advance() {
-      // Loop until we find a row in range, or reach the end of the iterator.
-      Map.Entry<ByteString, ByteString> entry = null;
-      while (rows.hasNext()) {
-        entry = rows.next();
-        if (!filter.apply(entry.getKey())
-            || !source.getRange().containsKey(ByteKey.of(entry.getKey()))) {
-          // Does not match row filter or does not match source range. Skip.
-          entry = null;
-          continue;
-        }
-        // Found a row inside this source's key range, stop.
-        break;
-      }
-
-      // Return false if no more rows.
-      if (entry == null) {
-        currentRow = null;
-        return false;
-      }
-
-      // Set the current row and return true.
-      currentRow = makeRow(entry.getKey(), entry.getValue());
-      return true;
-    }
-
-    @Override
-    public Row getCurrentRow() {
-      if (currentRow == null) {
-        throw new NoSuchElementException();
-      }
-      return currentRow;
-    }
-
-    @Override
-    public void close() {
-      rows = null;
-      currentRow = null;
-    }
-  }
-
-  /**
-   * A {@link BigtableService.Writer} implementation that writes to the static instance of
-   * {@link FakeBigtableService} stored in {@link #service}.
-   *
-   * <p>This writer only supports {@link Mutation Mutations} that consist only of {@link SetCell}
-   * entries. The column family in the {@link SetCell} is ignored; only the value is used.
-   *
-   * <p>When no {@link SetCell} is provided, the write will fail and this will be exposed via an
-   * exception on the returned {@link ListenableFuture}.
-   */
-  private static class FakeBigtableWriter implements BigtableService.Writer {
-    private final String tableId;
-
-    public FakeBigtableWriter(String tableId) {
-      this.tableId = tableId;
-    }
-
-    @Override
-    public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record) {
-      service.verifyTableExists(tableId);
-      Map<ByteString, ByteString> table = service.getTable(tableId);
-      ByteString key = record.getKey();
-      for (Mutation m : record.getValue()) {
-        SetCell cell = m.getSetCell();
-        if (cell.getValue().isEmpty()) {
-          return Futures.immediateFailedCheckedFuture(new IOException("cell value missing"));
-        }
-        table.put(key, cell.getValue());
-      }
-      return Futures.immediateFuture(Empty.getDefaultInstance());
-    }
-
-    @Override
-    public void close() {}
-  }
-
-  /** A serializable comparator for ByteString. Used to make row samples. */
-  private static final class ByteStringComparator implements Comparator<ByteString>, Serializable {
-    @Override
-    public int compare(ByteString o1, ByteString o2) {
-      return ByteKey.of(o1).compareTo(ByteKey.of(o2));
-    }
-  }
-}


[3/3] incubator-beam git commit: Closes #97

Posted by dh...@apache.org.
Closes #97


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aa43ec0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aa43ec0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aa43ec0b

Branch: refs/heads/master
Commit: aa43ec0b041b21a49601efcb3699456b52064f69
Parents: 9746f0d d138ae5
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 26 14:08:55 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 26 14:08:55 2016 -0700

----------------------------------------------------------------------
 pom.xml                                         |    1 -
 sdks/java/core/pom.xml                          |   25 -
 .../apache/beam/sdk/io/bigtable/BigtableIO.java | 1016 ------------------
 .../beam/sdk/io/bigtable/BigtableService.java   |  111 --
 .../sdk/io/bigtable/BigtableServiceImpl.java    |  244 -----
 .../beam/sdk/io/bigtable/package-info.java      |   23 -
 .../beam/sdk/io/bigtable/BigtableIOTest.java    |  729 -------------
 sdks/java/io/google-cloud-platform/pom.xml      |   96 ++
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 1016 ++++++++++++++++++
 .../sdk/io/gcp/bigtable/BigtableService.java    |  111 ++
 .../io/gcp/bigtable/BigtableServiceImpl.java    |  244 +++++
 .../beam/sdk/io/gcp/bigtable/package-info.java  |   23 +
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     |  729 +++++++++++++
 sdks/java/io/pom.xml                            |    1 +
 14 files changed, 2220 insertions(+), 2149 deletions(-)
----------------------------------------------------------------------